diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 9e7b9384d62..274634546b6 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -8578,10 +8578,13 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { // Reset notion of first if this request wants sequences before our starting sequence // and we would have nothing to send. If we have partial messages still need to send skips for those. + // We will keep sreq's first sequence to not create sequence mismatches on the follower, but we extend the last to our current state. if sreq.FirstSeq < state.FirstSeq && state.FirstSeq > sreq.LastSeq { s.Debugf("Catchup for stream '%s > %s' resetting request first sequence from %d to %d", mset.account(), mset.name(), sreq.FirstSeq, state.FirstSeq) - sreq.FirstSeq = state.FirstSeq + if state.LastSeq > sreq.LastSeq { + sreq.LastSeq = state.LastSeq + } } // Setup sequences to walk through. @@ -8717,10 +8720,22 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { if drOk && dr.First > 0 { sendDR() } - s.Noticef("Catchup for stream '%s > %s' complete", mset.account(), mset.name()) - // EOF - s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, nil) - return false + // Check for a condition where our state's first is now past the last that we could have sent. + // If so reset last and continue sending. + var state StreamState + mset.mu.RLock() + mset.store.FastState(&state) + mset.mu.RUnlock() + if last < state.FirstSeq { + last = state.LastSeq + } + // Recheck our exit condition. + if seq == last { + s.Noticef("Catchup for stream '%s > %s' complete", mset.account(), mset.name()) + // EOF + s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, nil) + return false + } } select { case <-remoteQuitCh: diff --git a/server/jetstream_cluster_2_test.go b/server/jetstream_cluster_2_test.go index 7a6f60e6437..8c7e17593fd 100644 --- a/server/jetstream_cluster_2_test.go +++ b/server/jetstream_cluster_2_test.go @@ -6586,14 +6586,14 @@ func TestJetStreamClusterSnapshotBeforePurgeAndCatchup(t *testing.T) { } } - // Send first 100 to everyone. + // Send first 1000 to everyone. send1k() // Now shutdown a non-leader. c.waitOnStreamCurrent(nl, "$G", "TEST") nl.Shutdown() - // Send another 100. + // Send another 1000. send1k() // Force snapshot on the leader. @@ -6606,7 +6606,7 @@ func TestJetStreamClusterSnapshotBeforePurgeAndCatchup(t *testing.T) { err = js.PurgeStream("TEST") require_NoError(t, err) - // Send another 100. + // Send another 1000. send1k() // We want to make sure we do not send unnecessary skip msgs when we know we do not have all of these messages. @@ -6630,10 +6630,11 @@ func TestJetStreamClusterSnapshotBeforePurgeAndCatchup(t *testing.T) { return nil }) - // Make sure we only sent 1 sync catchup msg. + // Make sure we only sent 1002 sync catchup msgs. + // This is for the new messages, the delete range, and the EOF. nmsgs, _, _ := sub.Pending() - if nmsgs != 1 { - t.Fatalf("Expected only 1 sync catchup msg to be sent signaling eof, but got %d", nmsgs) + if nmsgs != 1002 { + t.Fatalf("Expected only 1002 sync catchup msgs to be sent signaling eof, but got %d", nmsgs) } } diff --git a/server/norace_test.go b/server/norace_test.go index 83c4ea8e077..de85203fa7b 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -9514,3 +9514,88 @@ func TestNoRaceJetStreamClusterBadRestartsWithHealthzPolling(t *testing.T) { return nil }) } + +func TestNoRaceJetStreamKVReplaceWithServerRestart(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, _ := jsClientConnect(t, c.randomServer()) + defer nc.Close() + // Shorten wait time for disconnects. + js, err := nc.JetStream(nats.MaxWait(time.Second)) + require_NoError(t, err) + + kv, err := js.CreateKeyValue(&nats.KeyValueConfig{ + Bucket: "TEST", + Replicas: 3, + }) + require_NoError(t, err) + + createData := func(n int) []byte { + const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + b := make([]byte, n) + for i := range b { + b[i] = letterBytes[rand.Intn(len(letterBytes))] + } + return b + } + + _, err = kv.Create("foo", createData(160)) + require_NoError(t, err) + + ch := make(chan struct{}) + wg := sync.WaitGroup{} + + // For counting errors that should not happen. + errCh := make(chan error, 1024) + + wg.Add(1) + go func() { + defer wg.Done() + + var lastData []byte + var revision uint64 + + for { + select { + case <-ch: + return + default: + k, err := kv.Get("foo") + if err == nats.ErrKeyNotFound { + errCh <- err + } else if k != nil { + if lastData != nil && k.Revision() == revision && !bytes.Equal(lastData, k.Value()) { + errCh <- fmt.Errorf("data loss [%s][rev:%d] expected:[%q] is:[%q]\n", "foo", revision, lastData, k.Value()) + } + newData := createData(160) + if revision, err = kv.Update("foo", newData, k.Revision()); err == nil { + lastData = newData + } + } + } + } + }() + + // Wait a short bit. + time.Sleep(2 * time.Second) + for _, s := range c.servers { + s.Shutdown() + // Need to leave servers down for awhile to trigger bug properly. + time.Sleep(5 * time.Second) + s = c.restartServer(s) + c.waitOnServerHealthz(s) + } + + // Shutdown the go routine above. + close(ch) + // Wait for it to finish. + wg.Wait() + + if len(errCh) != 0 { + for err := range errCh { + t.Logf("Received err %v during test", err) + } + t.Fatalf("Encountered errors") + } +}