Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 45 additions & 12 deletions server/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1880,6 +1880,14 @@ type GatewayzOptions struct {

// AccountName will limit the list of accounts to that account name (makes Accounts implicit)
AccountName string `json:"account_name"`

// AccountSubscriptions indicates if subscriptions should be included in the results.
// Note: This is used only if `Accounts` or `AccountName` are specified.
AccountSubscriptions bool `json:"subscriptions"`

// AccountSubscriptionsDetail indicates if subscription details should be included in the results.
// Note: This is used only if `Accounts` or `AccountName` are specified.
AccountSubscriptionsDetail bool `json:"subscriptions_detail"`
}

// Gatewayz represents detailed information on Gateways
Expand All @@ -1902,12 +1910,14 @@ type RemoteGatewayz struct {

// AccountGatewayz represents interest mode for this account
type AccountGatewayz struct {
Name string `json:"name"`
InterestMode string `json:"interest_mode"`
NoInterestCount int `json:"no_interest_count,omitempty"`
InterestOnlyThreshold int `json:"interest_only_threshold,omitempty"`
TotalSubscriptions int `json:"num_subs,omitempty"`
NumQueueSubscriptions int `json:"num_queue_subs,omitempty"`
Name string `json:"name"`
InterestMode string `json:"interest_mode"`
NoInterestCount int `json:"no_interest_count,omitempty"`
InterestOnlyThreshold int `json:"interest_only_threshold,omitempty"`
TotalSubscriptions int `json:"num_subs,omitempty"`
NumQueueSubscriptions int `json:"num_queue_subs,omitempty"`
Subs []string `json:"subscriptions_list,omitempty"`
SubsDetail []SubDetail `json:"subscriptions_list_detail,omitempty"`
}

// Gatewayz returns a Gatewayz struct containing information about gateways.
Expand Down Expand Up @@ -2033,22 +2043,22 @@ func createOutboundAccountsGatewayz(opts *GatewayzOptions, gw *gateway) []*Accou
if !ok {
return nil
}
a := createAccountOutboundGatewayz(accName, ei)
a := createAccountOutboundGatewayz(opts, accName, ei)
return []*AccountGatewayz{a}
}

accs := make([]*AccountGatewayz, 0, 4)
gw.outsim.Range(func(k, v any) bool {
name := k.(string)
a := createAccountOutboundGatewayz(name, v)
a := createAccountOutboundGatewayz(opts, name, v)
accs = append(accs, a)
return true
})
return accs
}

// Returns an AccountGatewayz for this gateway outbound connection
func createAccountOutboundGatewayz(name string, ei any) *AccountGatewayz {
func createAccountOutboundGatewayz(opts *GatewayzOptions, name string, ei any) *AccountGatewayz {
a := &AccountGatewayz{
Name: name,
InterestOnlyThreshold: gatewayMaxRUnsubBeforeSwitch,
Expand All @@ -2060,6 +2070,23 @@ func createAccountOutboundGatewayz(name string, ei any) *AccountGatewayz {
a.NoInterestCount = len(e.ni)
a.NumQueueSubscriptions = e.qsubs
a.TotalSubscriptions = int(e.sl.Count())
if opts.AccountSubscriptions || opts.AccountSubscriptionsDetail {
var subsa [4096]*subscription
subs := subsa[:0]
e.sl.All(&subs)
if opts.AccountSubscriptions {
a.Subs = make([]string, 0, len(subs))
} else {
a.SubsDetail = make([]SubDetail, 0, len(subs))
}
for _, sub := range subs {
if opts.AccountSubscriptions {
a.Subs = append(a.Subs, string(sub.subject))
} else {
a.SubsDetail = append(a.SubsDetail, newClientSubDetail(sub))
}
}
}
e.RUnlock()
} else {
a.InterestMode = Optimistic.String()
Expand Down Expand Up @@ -2151,6 +2178,10 @@ func (s *Server) HandleGatewayz(w http.ResponseWriter, r *http.Request) {
s.httpReqStats[GatewayzPath]++
s.mu.Unlock()

subs, subsDet, err := decodeSubs(w, r)
if err != nil {
return
}
accs, err := decodeBool(w, r, "accs")
if err != nil {
return
Expand All @@ -2162,9 +2193,11 @@ func (s *Server) HandleGatewayz(w http.ResponseWriter, r *http.Request) {
}

opts := &GatewayzOptions{
Name: gwName,
Accounts: accs,
AccountName: accName,
Name: gwName,
Accounts: accs,
AccountName: accName,
AccountSubscriptions: subs,
AccountSubscriptionsDetail: subsDet,
}
gw, err := s.Gatewayz(opts)
if err != nil {
Expand Down
143 changes: 143 additions & 0 deletions server/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3756,6 +3756,149 @@ func TestMonitorGatewayzAccounts(t *testing.T) {
})
}

func TestMonitorGatewayzWithSubs(t *testing.T) {
resetPreviousHTTPConnections()

ob := testDefaultOptionsForGateway("B")
aA := NewAccount("A")
aB := NewAccount("B")
ob.Accounts = append(ob.Accounts, aA, aB)
ob.Users = append(ob.Users,
&User{Username: "a", Password: "a", Account: aA},
&User{Username: "b", Password: "b", Account: aB})
sb := runGatewayServer(ob)
defer sb.Shutdown()

oa := testGatewayOptionsFromToWithServers(t, "A", "B", sb)
oa.HTTPHost = "127.0.0.1"
oa.HTTPPort = MONITOR_PORT
aA = NewAccount("A")
aB = NewAccount("B")
oa.Accounts = append(oa.Accounts, aA, aB)
oa.Users = append(oa.Users,
&User{Username: "a", Password: "a", Account: aA},
&User{Username: "b", Password: "b", Account: aB})
sa := runGatewayServer(oa)
defer sa.Shutdown()

waitForOutboundGateways(t, sa, 1, 2*time.Second)
waitForInboundGateways(t, sa, 1, 2*time.Second)

waitForOutboundGateways(t, sb, 1, 2*time.Second)
waitForInboundGateways(t, sb, 1, 2*time.Second)

ncA := natsConnect(t, sb.ClientURL(), nats.UserInfo("a", "a"))
defer ncA.Close()
natsSubSync(t, ncA, "foo")
natsFlush(t, ncA)

ncB := natsConnect(t, sb.ClientURL(), nats.UserInfo("b", "b"))
defer ncB.Close()
natsSubSync(t, ncB, "foo")
natsQueueSubSync(t, ncB, "bar", "baz")
natsFlush(t, ncB)

checkGWInterestOnlyModeInterestOn(t, sa, "B", "A", "foo")
checkGWInterestOnlyModeInterestOn(t, sa, "B", "B", "foo")
checkForRegisteredQSubInterest(t, sa, "B", "B", "bar", 1, time.Second)

for _, test := range []struct {
url string
allAccs bool
opts *GatewayzOptions
}{
{"accs=1&subs=1", true, &GatewayzOptions{Accounts: true, AccountSubscriptions: true}},
{"accs=1&subs=detail", true, &GatewayzOptions{Accounts: true, AccountSubscriptionsDetail: true}},
{"acc_name=B&subs=1", false, &GatewayzOptions{AccountName: "B", AccountSubscriptions: true}},
{"acc_name=B&subs=detail", false, &GatewayzOptions{AccountName: "B", AccountSubscriptionsDetail: true}},
} {
t.Run(test.url, func(t *testing.T) {
gatewayzURL := fmt.Sprintf("http://127.0.0.1:%d/gatewayz?%s", sa.MonitorAddr().Port, test.url)
for pollMode := 0; pollMode < 2; pollMode++ {
gw := pollGatewayz(t, sa, pollMode, gatewayzURL, test.opts)
require_Equal(t, len(gw.OutboundGateways), 1)
ogw, ok := gw.OutboundGateways["B"]
require_True(t, ok)
require_NotNil(t, ogw)
var expected int
if test.allAccs {
expected = 3 // A + B + $G
} else {
expected = 1 // B
}
require_Len(t, len(ogw.Accounts), expected)
accs := map[string]*AccountGatewayz{}
for _, a := range ogw.Accounts {
// Do not include the global account there.
if a.Name == globalAccountName {
continue
}
accs[a.Name] = a
}
// Update the expected number of accounts if we asked for all accounts.
if test.allAccs {
expected--
}
// The account B should always be present.
_, ok = accs["B"]
require_True(t, ok)
if expected == 2 {
_, ok = accs["A"]
require_True(t, ok)
}
// Now that we know we have the proper account(s), check the content.
for n, a := range accs {
require_NotNil(t, a)
require_Equal(t, a.Name, n)
totalSubs := 1
var numQueueSubs int
if n == "B" {
totalSubs++
numQueueSubs = 1
}
require_Equal(t, a.TotalSubscriptions, totalSubs)
require_Equal(t, a.NumQueueSubscriptions, numQueueSubs)

m := map[string]*SubDetail{}
if test.opts.AccountSubscriptions {
require_Len(t, len(a.Subs), totalSubs)
require_Len(t, len(a.SubsDetail), 0)
for _, sub := range a.Subs {
m[sub] = nil
}
} else {
require_Len(t, len(a.Subs), 0)
require_Len(t, len(a.SubsDetail), totalSubs)
for _, sub := range a.SubsDetail {
m[sub.Subject] = &sub
}
}
sd, ok := m["foo"]
require_True(t, ok)
if test.opts.AccountSubscriptionsDetail {
require_NotNil(t, sd)
require_Equal(t, sd.Queue, _EMPTY_)
} else {
require_True(t, sd == nil)
}
sd, ok = m["bar"]
if numQueueSubs == 1 {
require_True(t, ok)
if test.opts.AccountSubscriptionsDetail {
require_NotNil(t, sd)
require_Equal(t, sd.Queue, "baz")
} else {
require_True(t, sd == nil)
}
} else {
require_False(t, ok)
}
}
}
})
}
}

func TestMonitorRoutezRTT(t *testing.T) {
// Do not change default PingInterval and expect RTT to still be reported

Expand Down
1 change: 0 additions & 1 deletion test/ocsp_peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1475,7 +1475,6 @@ func TestOCSPResponseCacheMonitor(t *testing.T) {
s, _ := RunServerWithConfig(conf)
defer s.Shutdown()
v := monitorGetVarzHelper(t, 8222)
fmt.Println("Expect:", test.expect)
var ct string
if v.OCSPResponseCache != nil {
ct = v.OCSPResponseCache.Type
Expand Down