Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -2412,7 +2412,7 @@ func (s *Server) gatewayUpdateSubInterest(accName string, sub *subscription, cha
if change < 0 {
return
}
entry = &sitally{n: 1, q: sub.queue != nil}
entry = &sitally{n: change, q: sub.queue != nil}
st[string(key)] = entry
first = true
} else {
Expand Down
94 changes: 94 additions & 0 deletions server/leafnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4964,6 +4964,7 @@ func TestLeafNodeQueueGroupWeightCorrectOnConnectionCloseInSuperCluster(t *testi
%s
}
leafnodes {
reconnect: "10ms"
remotes [
{
url: "nats://user:pwd@127.0.0.1:%d"
Expand Down Expand Up @@ -5178,6 +5179,99 @@ func TestLeafNodeQueueGroupWeightCorrectOnConnectionCloseInSuperCluster(t *testi

ncB1.Close()
checkInterest(t, []int{0, 0, 0, 0, 0, 0, 0, 0}, 0)

// Now we will create 2 qsubs to sa2 that is still running.
ncA2 = natsConnect(t, sa2.ClientURL(), nats.UserInfo("user", "pwd"))
defer ncA2.Close()
qsub1 := natsQueueSubSync(t, ncA2, "foo", "bar")
qsub2 := natsQueueSubSync(t, ncA2, "foo", "bar")
natsFlush(t, ncA2)
// sa1 is down, so 0, sa2 has 2 queue subs, so1 is down, so 0, so2 is up, so
// 1 for remote interest, same for sb1. Server sb2 is down, so 0, then 1 for
// remote interest for sc and sd. The expected tally for the gateway to "C"
// should be 2.
checkInterest(t, []int{0, 2, 0, 1, 1, 0, 1, 1}, 2)

// Close the leaf connection between sb1 and sa2
sa2.mu.Lock()
for _, l := range sa2.leafs {
l.mu.Lock()
l.nc.Close()
l.mu.Unlock()
}
sa2.mu.Unlock()
time.Sleep(50 * time.Millisecond)
checkLeafNodeConnected(t, sa2)
// Should be the same counts
checkInterest(t, []int{0, 2, 0, 1, 1, 0, 1, 1}, 2)

// Unsubscribe one of the queue sub.
qsub2.Unsubscribe()
natsFlush(t, ncA2)
time.Sleep(50 * time.Millisecond)
// One less for the local interest on sa2 and 1 less for the expected GW count.
checkInterest(t, []int{0, 1, 0, 1, 1, 0, 1, 1}, 1)

// Verify that interest works by publishing from server "C".
ncC := natsConnect(t, sc.ClientURL(), nats.UserInfo("user", "pwd"))
defer ncC.Close()
natsPub(t, ncC, "foo", []byte("hello"))

// Message should be received by qsub1.
natsNexMsg(t, qsub1, time.Second)

// Now stop sa2 and instead start sb2.
ncA2.Close()
sa2.Shutdown()
sa2.WaitForShutdown()

// Wait for counts to go down to 0
checkInterest(t, []int{0, 0, 0, 0, 0, 0, 0, 0}, 0)

// Restart sb2 to form route to sb1.
sb2, _ = RunServerWithConfig(sb2Conf)
defer sb2.Shutdown()

checkClusterFormed(t, sb1, sb2)
waitForOutboundGateways(t, sb2, 1, time.Second)

// Create 2 queue subs on sb2.
ncB2 = natsConnect(t, sb2.ClientURL(), nats.UserInfo("user", "pwd"))
defer ncB2.Close()

qsub1 = natsQueueSubSync(t, ncB2, "foo", "bar")
qsub2 = natsQueueSubSync(t, ncB2, "foo", "bar")
natsFlush(t, ncB2)
// sa1 and sa2 are down, so 0, so1 is down, so 0, so2 is up, so 1 for remote interest,
// same for sb1. Server sb2 has 2 local queue subs, so 2. Servers sc and sd have 1 for
// remote interest. The expected tally for the gateway to "C" should be 2.
checkInterest(t, []int{0, 0, 0, 1, 1, 2, 1, 1}, 2)

// Now close the route(s) between sb1 and sb2.
sb2.mu.Lock()
sb2.forEachRoute(func(r *client) {
r.mu.Lock()
r.nc.Close()
r.mu.Unlock()
})
sb2.mu.Unlock()
time.Sleep(50 * time.Millisecond)
checkClusterFormed(t, sb1, sb2)
// Should be the same counts
checkInterest(t, []int{0, 0, 0, 1, 1, 2, 1, 1}, 2)

// Unsubscribe one of the queue sub.
qsub2.Unsubscribe()
natsFlush(t, ncB2)
time.Sleep(50 * time.Millisecond)
// One less for the local interest on sb2 and 1 less for the expected GW count.
checkInterest(t, []int{0, 0, 0, 1, 1, 1, 1, 1}, 1)

// Verify that interest works by publishing from server "C".
natsPub(t, ncC, "foo", []byte("hello"))

// Message should be received by qsub1.
natsNexMsg(t, qsub1, time.Second)
}

func TestLeafNodeQueueInterestAndWeightCorrectAfterServerRestartOrConnectionClose(t *testing.T) {
Expand Down
Loading