Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 209 additions & 3 deletions server/jetstream_leafnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package server

import (
"errors"
"fmt"
"os"
"strings"
Expand Down Expand Up @@ -1619,7 +1620,7 @@ func TestJetStreamLeafNodeAndMirrorResyncAfterConnectionDown(t *testing.T) {
// Reset sourceInfo to have lots of failures and last attempt 2 minutes ago.
// Lock should be held on parent stream.
resetSourceInfo := func(si *sourceInfo) {
si.sip = false
// Do not reset sip here to make sure that the internal logic clears.
si.fails = 100
si.lreq = time.Now().Add(-2 * time.Minute)
}
Expand Down Expand Up @@ -1668,7 +1669,212 @@ func TestJetStreamLeafNodeAndMirrorResyncAfterConnectionDown(t *testing.T) {
err = checkStreamMsgs(jsB, "SRC-B", initMsgs*4, err)
return err
})
if elapsed := time.Since(start); elapsed > 2*time.Second {
t.Fatalf("Expected to resync all streams <2s but got %v", elapsed)
if elapsed := time.Since(start); elapsed > 3*time.Second {
t.Fatalf("Expected to resync all streams <3s but got %v", elapsed)
}
}

// This test will test a 3 node setup where we have a hub node, a gateway node, and a satellite node.
// This is specifically testing re-sync when there is not a direct Domain with JS match for the first
// hop connect LN that is signaling.
//
// HUB <---- GW(+JS/DOMAIN) -----> SAT1
// ^
// |
// +------- GW(-JS/NO DOMAIN) --> SAT2
//
// The Gateway node will solicit the satellites but will act as a LN hub.
func TestJetStreamLeafNodeAndMirrorResyncAfterLeafEstablished(t *testing.T) {
accs := `
accounts {
JS { users = [ { user: "u", pass: "p" } ]; jetstream: true }
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
}
`
hubT := `
listen: -1
server_name: hub
jetstream { store_dir: '%s', domain: HUB }
%s
leaf { port: -1 }
`
confA := createConfFile(t, []byte(fmt.Sprintf(hubT, t.TempDir(), accs)))
sHub, oHub := RunServerWithConfig(confA)
defer sHub.Shutdown()

// We run the SAT node second to extract out info for solicitation from targeted GW.
sat1T := `
listen: -1
server_name: sat1
jetstream { store_dir: '%s', domain: SAT1 }
%s
leaf { port: -1 }
`
confB := createConfFile(t, []byte(fmt.Sprintf(sat1T, t.TempDir(), accs)))
sSat1, oSat1 := RunServerWithConfig(confB)
defer sSat1.Shutdown()

sat2T := `
listen: -1
server_name: sat2
jetstream { store_dir: '%s', domain: SAT2 }
%s
leaf { port: -1 }
`
confC := createConfFile(t, []byte(fmt.Sprintf(sat2T, t.TempDir(), accs)))
sSat2, oSat2 := RunServerWithConfig(confC)
defer sSat2.Shutdown()

hubLeafPort := fmt.Sprintf("nats://u:p@127.0.0.1:%d", oHub.LeafNode.Port)
sat1LeafPort := fmt.Sprintf("nats://u:p@127.0.0.1:%d", oSat1.LeafNode.Port)
sat2LeafPort := fmt.Sprintf("nats://u:p@127.0.0.1:%d", oSat2.LeafNode.Port)

gw1T := `
listen: -1
server_name: gw1
jetstream { store_dir: '%s', domain: GW }
%s
leaf { remotes [ { url: %s, account: "JS" }, { url: %s, account: "JS", hub: true } ], reconnect: "0.25s" }
`
confD := createConfFile(t, []byte(fmt.Sprintf(gw1T, t.TempDir(), accs, hubLeafPort, sat1LeafPort)))
sGW1, _ := RunServerWithConfig(confD)
defer sGW1.Shutdown()

gw2T := `
listen: -1
server_name: gw2
accounts {
JS { users = [ { user: "u", pass: "p" } ] }
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
}
leaf { remotes [ { url: %s, account: "JS" }, { url: %s, account: "JS", hub: true } ], reconnect: "0.25s" }
`
confE := createConfFile(t, []byte(fmt.Sprintf(gw2T, hubLeafPort, sat2LeafPort)))
sGW2, _ := RunServerWithConfig(confE)
defer sGW2.Shutdown()

// Make sure we are connected ok.
checkLeafNodeConnectedCount(t, sHub, 2)
checkLeafNodeConnectedCount(t, sSat1, 1)
checkLeafNodeConnectedCount(t, sSat2, 1)
checkLeafNodeConnectedCount(t, sGW1, 2)
checkLeafNodeConnectedCount(t, sGW2, 2)

// Let's place a muxed stream on the hub and have it source from a stream on the Satellite.
// Connect to Hub.
ncHub, jsHub := jsClientConnect(t, sHub, nats.UserInfo("u", "p"))
defer ncHub.Close()

_, err := jsHub.AddStream(&nats.StreamConfig{Name: "HUB", Subjects: []string{"H.>"}})
require_NoError(t, err)

// Connect to Sat1.
ncSat1, jsSat1 := jsClientConnect(t, sSat1, nats.UserInfo("u", "p"))
defer ncSat1.Close()

_, err = jsSat1.AddStream(&nats.StreamConfig{
Name: "SAT-1",
Subjects: []string{"S1.*"},
Sources: []*nats.StreamSource{{
Name: "HUB",
FilterSubject: "H.SAT-1.>",
External: &nats.ExternalStream{APIPrefix: "$JS.HUB.API"},
}},
})
require_NoError(t, err)

// Connect to Sat2.
ncSat2, jsSat2 := jsClientConnect(t, sSat2, nats.UserInfo("u", "p"))
defer ncSat2.Close()

_, err = jsSat2.AddStream(&nats.StreamConfig{
Name: "SAT-2",
Subjects: []string{"S2.*"},
Sources: []*nats.StreamSource{{
Name: "HUB",
FilterSubject: "H.SAT-2.>",
External: &nats.ExternalStream{APIPrefix: "$JS.HUB.API"},
}},
})
require_NoError(t, err)

// Put in 10 msgs each in for each satellite.
for i := 0; i < 10; i++ {
jsHub.Publish("H.SAT-1.foo", []byte("CMD"))
jsHub.Publish("H.SAT-2.foo", []byte("CMD"))
}
// Make sure both are sync'd.
checkFor(t, time.Second, 100*time.Millisecond, func() error {
si, err := jsSat1.StreamInfo("SAT-1")
require_NoError(t, err)
if si.State.Msgs != 10 {
return errors.New("SAT-1 Not sync'd yet")
}
si, err = jsSat2.StreamInfo("SAT-2")
require_NoError(t, err)
if si.State.Msgs != 10 {
return errors.New("SAT-2 Not sync'd yet")
}
return nil
})

testReconnect := func(t *testing.T, delay time.Duration, expected uint64) {
// Now disconnect Sat1 and Sat2. In 2.12 we can do this with active: false, but since this will be
// pulled into 2.11.9 just shutdown both gateways.
sGW1.Shutdown()
checkLeafNodeConnectedCount(t, sSat1, 0)
checkLeafNodeConnectedCount(t, sHub, 1)

sGW2.Shutdown()
checkLeafNodeConnectedCount(t, sSat2, 0)
checkLeafNodeConnectedCount(t, sHub, 0)

// Send 10 more messages for each while GW1 and GW2 are down.
for i := 0; i < 10; i++ {
jsHub.Publish("H.SAT-1.foo", []byte("CMD"))
jsHub.Publish("H.SAT-2.foo", []byte("CMD"))
}

// Keep GWs down for delay.
time.Sleep(delay)

sGW1, _ = RunServerWithConfig(confD)
// Make sure we are connected ok.
checkLeafNodeConnectedCount(t, sHub, 1)
checkLeafNodeConnectedCount(t, sSat1, 1)
checkLeafNodeConnectedCount(t, sGW1, 2)

sGW2, _ = RunServerWithConfig(confE)
// Make sure we are connected ok.
checkLeafNodeConnectedCount(t, sHub, 2)
checkLeafNodeConnectedCount(t, sSat2, 1)
checkLeafNodeConnectedCount(t, sGW2, 2)

// Make sure sync'd in less than a second or two.
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
si, err := jsSat1.StreamInfo("SAT-1")
require_NoError(t, err)
if si.State.Msgs != expected {
return fmt.Errorf("SAT-1 not sync'd, expected %d got %d", expected, si.State.Msgs)
}
si, err = jsSat2.StreamInfo("SAT-2")
require_NoError(t, err)
if si.State.Msgs != expected {
return fmt.Errorf("SAT-2 not sync'd, expected %d got %d", expected, si.State.Msgs)
}
return nil
})
}

// We will test two scenarios with amount of time the GWs (link) is down.
// 1. Just a second, we will not have detected the consumer is offline as of yet.
// 2. Just over sourceHealthCheckInterval, meaning we detect it is down and schedule for another try.
t.Run(fmt.Sprintf("reconnect-%v", time.Second), func(t *testing.T) {
testReconnect(t, time.Second, 20)
})
t.Run(fmt.Sprintf("reconnect-%v", sourceHealthCheckInterval+time.Second), func(t *testing.T) {
testReconnect(t, sourceHealthCheckInterval+time.Second, 30)
})
defer sGW1.Shutdown()
defer sGW2.Shutdown()
}
27 changes: 13 additions & 14 deletions server/leafnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -1440,7 +1440,7 @@ func (c *client) processLeafnodeInfo(info *Info) {
c.setPermissions(perms)
}

var resumeConnect, checkSyncConsumers bool
var resumeConnect bool

// If this is a remote connection and this is the first INFO protocol,
// then we need to finish the connect process by sending CONNECT, etc..
Expand All @@ -1450,7 +1450,6 @@ func (c *client) processLeafnodeInfo(info *Info) {
resumeConnect = true
} else if !firstINFO && didSolicit {
c.leaf.remoteAccName = info.RemoteAccount
checkSyncConsumers = info.JetStream
}

// Check if we have the remote account information and if so make sure it's stored.
Expand All @@ -1470,11 +1469,10 @@ func (c *client) processLeafnodeInfo(info *Info) {
s.leafNodeFinishConnectProcess(c)
}

// If we have JS enabled and so does the other side, we will
// check to see if we need to kick any internal source or mirror consumers.
if checkSyncConsumers {
s.checkInternalSyncConsumers(c.acc, info.Domain)
}
// Check to see if we need to kick any internal source or mirror consumers.
// This will be a no-op if JetStream not enabled for this server or if the bound account
// does not have jetstream.
s.checkInternalSyncConsumers(c.acc)
}

func (s *Server) negotiateLeafCompression(c *client, didSolicit bool, infoCompression string, co *CompressionOpts) (bool, error) {
Expand Down Expand Up @@ -2010,16 +2008,16 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro
// This will be a no-op as needed.
s.sendLeafNodeConnect(c.acc)

// If we have JS enabled and so does the other side, we will
// check to see if we need to kick any internal source or mirror consumers.
if proto.JetStream {
s.checkInternalSyncConsumers(acc, proto.Domain)
}
// Check to see if we need to kick any internal source or mirror consumers.
// This will be a no-op if JetStream not enabled for this server or if the bound account
// does not have jetstream.
s.checkInternalSyncConsumers(acc)

return nil
}

// checkInternalSyncConsumers
func (s *Server) checkInternalSyncConsumers(acc *Account, remoteDomain string) {
func (s *Server) checkInternalSyncConsumers(acc *Account) {
// Grab our js
js := s.getJetStream()

Expand All @@ -2038,6 +2036,7 @@ func (s *Server) checkInternalSyncConsumers(acc *Account, remoteDomain string) {
if jsa == nil {
return
}

var streams []*stream
jsa.mu.RLock()
for _, mset := range jsa.streams {
Expand All @@ -2055,7 +2054,7 @@ func (s *Server) checkInternalSyncConsumers(acc *Account, remoteDomain string) {
// Now loop through all candidates and check if we are the leader and have NOT
// created the sync up consumer.
for _, mset := range streams {
mset.retryDisconnectedSyncConsumers(remoteDomain)
mset.retryDisconnectedSyncConsumers()
}
}

Expand Down
33 changes: 18 additions & 15 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2616,7 +2616,7 @@ func (mset *stream) mirrorInfo() *StreamSourceInfo {

// retryDisconnectedSyncConsumers() will check if we have any disconnected
// sync consumers for either mirror or a source and will reset and retry to connect.
func (mset *stream) retryDisconnectedSyncConsumers(remoteDomain string) {
func (mset *stream) retryDisconnectedSyncConsumers() {
mset.mu.Lock()
defer mset.mu.Unlock()

Expand All @@ -2625,23 +2625,24 @@ func (mset *stream) retryDisconnectedSyncConsumers(remoteDomain string) {
return
}

shouldRetry := func(si *sourceInfo) bool {
if si != nil && (si.sip || si.sub == nil || (si.sub.client != nil && si.sub.client.isClosed())) {
// Need to reset
si.fails, si.sip = 0, false
mset.cancelSourceInfo(si)
return true
}
return false
}

// Check mirrors first.
if si := mset.mirror; si != nil {
if si.sub == nil && !si.sip {
if remoteDomain == _EMPTY_ || (mset.cfg.Mirror != nil && mset.cfg.Mirror.External.Domain() == remoteDomain) {
// Need to reset
si.fails = 0
mset.cancelSourceInfo(si)
mset.scheduleSetupMirrorConsumerRetry()
}
if shouldRetry(si) {
mset.scheduleSetupMirrorConsumerRetry()
}
} else {
for _, si := range mset.sources {
ss := mset.streamSource(si.iname)
if remoteDomain == _EMPTY_ || (ss != nil && ss.External.Domain() == remoteDomain) {
// Need to reset
si.fails = 0
mset.cancelSourceInfo(si)
if shouldRetry(si) {
mset.setupSourceConsumer(si.iname, si.sseq+1, time.Time{})
}
}
Expand Down Expand Up @@ -3166,7 +3167,8 @@ func (mset *stream) setupMirrorConsumer() error {
if mset.mirror != nil {
mset.mirror.sip = false
// If we need to retry, schedule now
if retry {
// If sub is not nil means we re-established somewhere else so do not re-attempt here.
if retry && mset.mirror.sub == nil {
mset.mirror.fails++
// Cancel here since we can not do anything with this consumer at this point.
mset.cancelSourceInfo(mset.mirror)
Expand Down Expand Up @@ -3524,7 +3526,8 @@ func (mset *stream) trySetupSourceConsumer(iname string, seq uint64, startTime t
if si := mset.sources[iname]; si != nil {
si.sip = false
// If we need to retry, schedule now
if retry {
// If sub is not nil means we re-established somewhere else so do not re-attempt here.
if retry && si.sub == nil {
si.fails++
// Cancel here since we can not do anything with this consumer at this point.
mset.cancelSourceInfo(si)
Expand Down
Loading