Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion server/jetstream_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1883,6 +1883,7 @@ type netProxy struct {
down int
url string
surl string
port int
}

func newNetProxy(rtt time.Duration, upRate, downRate int, serverURL string) *netProxy {
Expand All @@ -1895,14 +1896,27 @@ func createNetProxy(rtt time.Duration, upRate, downRate int, serverURL string, s
if e != nil {
panic(fmt.Sprintf("Error listening on port: %s, %q", hp, e))
}
u, err := url.Parse(serverURL)
if err != nil {
panic(fmt.Sprintf("Could not parse server URL: %v", err))
}

var clientURL string
port := l.Addr().(*net.TCPAddr).Port
if u.User != nil {
clientURL = fmt.Sprintf("nats://%v@127.0.0.1:%d", u.User, port)
} else {
clientURL = fmt.Sprintf("nats://127.0.0.1:%d", port)
}

proxy := &netProxy{
listener: l,
rtt: rtt,
up: upRate,
down: downRate,
url: fmt.Sprintf("nats://127.0.0.1:%d", port),
url: clientURL,
surl: serverURL,
port: port,
}
if start {
proxy.start()
Expand All @@ -1917,6 +1931,16 @@ func (np *netProxy) start() {
}
host := u.Host

// Check if this is restart.
// We nil out listener on stop()
if np.listener == nil && np.port != 0 {
hp := net.JoinHostPort("127.0.0.1", fmt.Sprintf("%d", np.port))
np.listener, err = net.Listen("tcp", hp)
if err != nil {
panic(fmt.Sprintf("Error listening on port: %s, %q", hp, err))
}
}

go func() {
for {
client, err := np.listener.Accept()
Expand All @@ -1942,6 +1966,10 @@ func (np *netProxy) routeURL() string {
return strings.Replace(np.url, "nats", "nats-route", 1)
}

func (np *netProxy) leafURL() string {
return strings.Replace(np.url, "nats", "nats-leaf", 1)
}

func (np *netProxy) loop(tbw int, r, w net.Conn) {
const rbl = 8192
var buf [rbl]byte
Expand Down
246 changes: 246 additions & 0 deletions server/jetstream_leafnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1426,3 +1426,249 @@ func TestJetStreamLeafNodeJSClusterMigrateRecoveryWithDelay(t *testing.T) {
// long election timer. Now this should work reliably.
lnc.waitOnStreamLeader(globalAccountName, "TEST")
}

// This will test that when a mirror or source construct is setup across a leafnode/domain
// that it will recover quickly once the LN is re-established regardless
// of backoff state of the internal consumer create.
func TestJetStreamLeafNodeAndMirrorResyncAfterConnectionDown(t *testing.T) {
tmplA := `
listen: -1
server_name: tcm
jetstream {
store_dir: '%s',
domain: TCM
}
accounts {
JS { users = [ { user: "y", pass: "p" } ]; jetstream: true }
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
}
leaf { port: -1 }
`
confA := createConfFile(t, []byte(fmt.Sprintf(tmplA, t.TempDir())))
sA, oA := RunServerWithConfig(confA)
defer sA.Shutdown()

// Create a proxy - we will use this to simulate a network down event.
rtt, bw := 10*time.Microsecond, 10*1024*1024*1024
proxy := newNetProxy(rtt, bw, bw, fmt.Sprintf("nats://y:p@127.0.0.1:%d", oA.LeafNode.Port))
defer proxy.stop()

tmplB := `
listen: -1
server_name: xmm
jetstream {
store_dir: '%s',
domain: XMM
}
accounts {
JS { users = [ { user: "y", pass: "p" } ]; jetstream: true }
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
}
leaf { remotes [ { url: %s, account: "JS" } ], reconnect: "0.25s" }
`

confB := createConfFile(t, []byte(fmt.Sprintf(tmplB, t.TempDir(), proxy.leafURL())))
sB, _ := RunServerWithConfig(confB)
defer sA.Shutdown()

// Make sure we are connected ok.
checkLeafNodeConnectedCount(t, sA, 1)
checkLeafNodeConnectedCount(t, sB, 1)

// We will have 3 streams that we will test for proper syncing after
// the network is restored.
//
// 1. Mirror A --> B
// 2. Mirror A <-- B
// 3. Source A <-> B

// Connect to sA.
ncA, jsA := jsClientConnect(t, sA, nats.UserInfo("y", "p"))
defer ncA.Close()

// Connect to sB.
ncB, jsB := jsClientConnect(t, sB, nats.UserInfo("y", "p"))
defer ncB.Close()

// Add in TEST-A
_, err := jsA.AddStream(&nats.StreamConfig{Name: "TEST-A", Subjects: []string{"foo"}})
require_NoError(t, err)

// Add in TEST-B
_, err = jsB.AddStream(&nats.StreamConfig{Name: "TEST-B", Subjects: []string{"bar"}})
require_NoError(t, err)

// Now setup mirrors.
_, err = jsB.AddStream(&nats.StreamConfig{
Name: "M-A",
Mirror: &nats.StreamSource{
Name: "TEST-A",
External: &nats.ExternalStream{APIPrefix: "$JS.TCM.API"},
},
})
require_NoError(t, err)

_, err = jsA.AddStream(&nats.StreamConfig{
Name: "M-B",
Mirror: &nats.StreamSource{
Name: "TEST-B",
External: &nats.ExternalStream{APIPrefix: "$JS.XMM.API"},
},
})
require_NoError(t, err)

// Now add in the streams that will source from one another bi-directionally.
_, err = jsA.AddStream(&nats.StreamConfig{
Name: "SRC-A",
Subjects: []string{"A.*"},
Sources: []*nats.StreamSource{{
Name: "SRC-B",
FilterSubject: "B.*",
External: &nats.ExternalStream{APIPrefix: "$JS.XMM.API"},
}},
})
require_NoError(t, err)

_, err = jsB.AddStream(&nats.StreamConfig{
Name: "SRC-B",
Subjects: []string{"B.*"},
Sources: []*nats.StreamSource{{
Name: "SRC-A",
FilterSubject: "A.*",
External: &nats.ExternalStream{APIPrefix: "$JS.TCM.API"},
}},
})
require_NoError(t, err)

// Now load them up with 500 messages.
initMsgs := 500
for i := 0; i < initMsgs; i++ {
// Individual Streams
jsA.PublishAsync("foo", []byte("PAYLOAD"))
jsB.PublishAsync("bar", []byte("PAYLOAD"))
// Bi-directional Sources
jsA.PublishAsync("A.foo", []byte("PAYLOAD"))
jsB.PublishAsync("B.bar", []byte("PAYLOAD"))
}
select {
case <-jsA.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}
select {
case <-jsB.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}

// Utility to check the number of stream msgs.
checkStreamMsgs := func(js nats.JetStreamContext, sname string, expected int, perr error) error {
t.Helper()
if perr != nil {
return perr
}
si, err := js.StreamInfo(sname)
require_NoError(t, err)
if si.State.Msgs != uint64(expected) {
return fmt.Errorf("Expected %d msgs for %s, got state: %+v", expected, sname, si.State)
}
return nil
}

// Wait til we see all messages.
checkFor(t, 2*time.Second, 250*time.Millisecond, func() error {
err := checkStreamMsgs(jsA, "TEST-A", initMsgs, nil)
err = checkStreamMsgs(jsB, "M-A", initMsgs, err)
err = checkStreamMsgs(jsB, "TEST-B", initMsgs, err)
err = checkStreamMsgs(jsA, "M-B", initMsgs, err)
err = checkStreamMsgs(jsA, "SRC-A", initMsgs*2, err)
err = checkStreamMsgs(jsB, "SRC-B", initMsgs*2, err)
return err
})

// Take down proxy. This will stop any propagation of messages between TEST and M streams.
proxy.stop()

// Now add an additional 500 messages to originals on both sides.
for i := 0; i < initMsgs; i++ {
// Individual Streams
jsA.PublishAsync("foo", []byte("PAYLOAD"))
jsB.PublishAsync("bar", []byte("PAYLOAD"))
// Bi-directional Sources
jsA.PublishAsync("A.foo", []byte("PAYLOAD"))
jsB.PublishAsync("B.bar", []byte("PAYLOAD"))
}
select {
Comment thread
derekcollison marked this conversation as resolved.
case <-jsA.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}
select {
case <-jsB.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}

cancelAndDelayConsumer := func(s *Server, stream string) {
// Now make sure internal consumer is at max backoff.
acc, err := s.lookupAccount("JS")
require_NoError(t, err)
mset, err := acc.lookupStream(stream)
require_NoError(t, err)

// Reset sourceInfo to have lots of failures and last attempt 2 minutes ago.
// Lock should be held on parent stream.
resetSourceInfo := func(si *sourceInfo) {
si.sip = false
si.fails = 100
si.lreq = time.Now().Add(-2 * time.Minute)
}

// Force the consumer to be canceled and we simulate 100 failed attempts
// such that the next time we will try will be a long way out.
mset.mu.Lock()
if mset.mirror != nil {
resetSourceInfo(mset.mirror)
mset.cancelSourceInfo(mset.mirror)
mset.scheduleSetupMirrorConsumerRetry()
} else if len(mset.sources) > 0 {
for iname, si := range mset.sources {
resetSourceInfo(si)
mset.cancelSourceInfo(si)
mset.setupSourceConsumer(iname, si.sseq+1, time.Time{})
}
}
mset.mu.Unlock()
}

// Mirrors
cancelAndDelayConsumer(sA, "M-B")
cancelAndDelayConsumer(sB, "M-A")
// Now bi-directional sourcing
cancelAndDelayConsumer(sA, "SRC-A")
cancelAndDelayConsumer(sB, "SRC-B")

// Now restart the network proxy.
proxy.start()

// Make sure we are connected ok.
checkLeafNodeConnectedCount(t, sA, 1)
checkLeafNodeConnectedCount(t, sB, 1)

// These should be good before re-sync.
require_NoError(t, checkStreamMsgs(jsA, "TEST-A", initMsgs*2, nil))
require_NoError(t, checkStreamMsgs(jsB, "TEST-B", initMsgs*2, nil))

start := time.Now()
// Wait til we see all messages.
checkFor(t, 2*time.Minute, 50*time.Millisecond, func() error {
err := checkStreamMsgs(jsA, "M-B", initMsgs*2, err)
err = checkStreamMsgs(jsB, "M-A", initMsgs*2, err)
err = checkStreamMsgs(jsA, "SRC-A", initMsgs*4, err)
err = checkStreamMsgs(jsB, "SRC-B", initMsgs*4, err)
return err
})
if elapsed := time.Since(start); elapsed > 2*time.Second {
t.Fatalf("Expected to resync all streams <2s but got %v", elapsed)
}
}
Loading