diff --git a/server/gateway.go b/server/gateway.go index cfa63f551fc..6257e3f61df 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -2412,7 +2412,7 @@ func (s *Server) gatewayUpdateSubInterest(accName string, sub *subscription, cha if change < 0 { return } - entry = &sitally{n: 1, q: sub.queue != nil} + entry = &sitally{n: change, q: sub.queue != nil} st[string(key)] = entry first = true } else { diff --git a/server/leafnode_test.go b/server/leafnode_test.go index e1c2b25a05d..3ba36d9d58e 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -4964,6 +4964,7 @@ func TestLeafNodeQueueGroupWeightCorrectOnConnectionCloseInSuperCluster(t *testi %s } leafnodes { + reconnect: "10ms" remotes [ { url: "nats://user:pwd@127.0.0.1:%d" @@ -5178,6 +5179,99 @@ func TestLeafNodeQueueGroupWeightCorrectOnConnectionCloseInSuperCluster(t *testi ncB1.Close() checkInterest(t, []int{0, 0, 0, 0, 0, 0, 0, 0}, 0) + + // Now we will create 2 qsubs to sa2 that is still running. + ncA2 = natsConnect(t, sa2.ClientURL(), nats.UserInfo("user", "pwd")) + defer ncA2.Close() + qsub1 := natsQueueSubSync(t, ncA2, "foo", "bar") + qsub2 := natsQueueSubSync(t, ncA2, "foo", "bar") + natsFlush(t, ncA2) + // sa1 is down, so 0, sa2 has 2 queue subs, so1 is down, so 0, so2 is up, so + // 1 for remote interest, same for sb1. Server sb2 is down, so 0, then 1 for + // remote interest for sc and sd. The expected tally for the gateway to "C" + // should be 2. + checkInterest(t, []int{0, 2, 0, 1, 1, 0, 1, 1}, 2) + + // Close the leaf connection between sb1 and sa2 + sa2.mu.Lock() + for _, l := range sa2.leafs { + l.mu.Lock() + l.nc.Close() + l.mu.Unlock() + } + sa2.mu.Unlock() + time.Sleep(50 * time.Millisecond) + checkLeafNodeConnected(t, sa2) + // Should be the same counts + checkInterest(t, []int{0, 2, 0, 1, 1, 0, 1, 1}, 2) + + // Unsubscribe one of the queue sub. + qsub2.Unsubscribe() + natsFlush(t, ncA2) + time.Sleep(50 * time.Millisecond) + // One less for the local interest on sa2 and 1 less for the expected GW count. + checkInterest(t, []int{0, 1, 0, 1, 1, 0, 1, 1}, 1) + + // Verify that interest works by publishing from server "C". + ncC := natsConnect(t, sc.ClientURL(), nats.UserInfo("user", "pwd")) + defer ncC.Close() + natsPub(t, ncC, "foo", []byte("hello")) + + // Message should be received by qsub1. + natsNexMsg(t, qsub1, time.Second) + + // Now stop sa2 and instead start sb2. + ncA2.Close() + sa2.Shutdown() + sa2.WaitForShutdown() + + // Wait for counts to go down to 0 + checkInterest(t, []int{0, 0, 0, 0, 0, 0, 0, 0}, 0) + + // Restart sb2 to form route to sb1. + sb2, _ = RunServerWithConfig(sb2Conf) + defer sb2.Shutdown() + + checkClusterFormed(t, sb1, sb2) + waitForOutboundGateways(t, sb2, 1, time.Second) + + // Create 2 queue subs on sb2. + ncB2 = natsConnect(t, sb2.ClientURL(), nats.UserInfo("user", "pwd")) + defer ncB2.Close() + + qsub1 = natsQueueSubSync(t, ncB2, "foo", "bar") + qsub2 = natsQueueSubSync(t, ncB2, "foo", "bar") + natsFlush(t, ncB2) + // sa1 and sa2 are down, so 0, so1 is down, so 0, so2 is up, so 1 for remote interest, + // same for sb1. Server sb2 has 2 local queue subs, so 2. Servers sc and sd have 1 for + // remote interest. The expected tally for the gateway to "C" should be 2. + checkInterest(t, []int{0, 0, 0, 1, 1, 2, 1, 1}, 2) + + // Now close the route(s) between sb1 and sb2. + sb2.mu.Lock() + sb2.forEachRoute(func(r *client) { + r.mu.Lock() + r.nc.Close() + r.mu.Unlock() + }) + sb2.mu.Unlock() + time.Sleep(50 * time.Millisecond) + checkClusterFormed(t, sb1, sb2) + // Should be the same counts + checkInterest(t, []int{0, 0, 0, 1, 1, 2, 1, 1}, 2) + + // Unsubscribe one of the queue sub. + qsub2.Unsubscribe() + natsFlush(t, ncB2) + time.Sleep(50 * time.Millisecond) + // One less for the local interest on sb2 and 1 less for the expected GW count. + checkInterest(t, []int{0, 0, 0, 1, 1, 1, 1, 1}, 1) + + // Verify that interest works by publishing from server "C". + natsPub(t, ncC, "foo", []byte("hello")) + + // Message should be received by qsub1. + natsNexMsg(t, qsub1, time.Second) } func TestLeafNodeQueueInterestAndWeightCorrectAfterServerRestartOrConnectionClose(t *testing.T) {