Skip to content
22 changes: 11 additions & 11 deletions server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,9 +481,9 @@ func (s *Server) sendInternalAccountMsg(a *Account, subject string, msg interfac

// Used to send an internal message with an optional reply to an arbitrary account.
func (s *Server) sendInternalAccountMsgWithReply(a *Account, subject, reply string, hdr map[string]string, msg interface{}, echo bool) error {
s.mu.Lock()
s.mu.RLock()
if s.sys == nil || s.sys.sendq == nil {
s.mu.Unlock()
s.mu.RUnlock()
return ErrNoSysAccount
}
c := s.sys.client
Expand All @@ -494,16 +494,16 @@ func (s *Server) sendInternalAccountMsgWithReply(a *Account, subject, reply stri
a.mu.Unlock()
}
s.sys.sendq.push(newPubMsg(c, subject, reply, nil, hdr, msg, noCompression, echo, false))
s.mu.Unlock()
s.mu.RUnlock()
return nil
}

// This will queue up a message to be sent.
// Lock should not be held.
func (s *Server) sendInternalMsgLocked(subj, rply string, si *ServerInfo, msg interface{}) {
s.mu.Lock()
s.mu.RLock()
s.sendInternalMsg(subj, rply, si, msg)
s.mu.Unlock()
s.mu.RUnlock()
}

// This will queue up a message to be sent.
Expand All @@ -517,13 +517,13 @@ func (s *Server) sendInternalMsg(subj, rply string, si *ServerInfo, msg interfac

// Will send an api response.
func (s *Server) sendInternalResponse(subj string, response *ServerAPIResponse) {
s.mu.Lock()
s.mu.RLock()
if s.sys == nil || s.sys.sendq == nil {
s.mu.Unlock()
s.mu.RUnlock()
return
}
s.sys.sendq.push(newPubMsg(nil, subj, _EMPTY_, response.Server, nil, response, response.compress, false, false))
s.mu.Unlock()
s.mu.RUnlock()
}

// Used to send internal messages from other system clients to avoid no echo issues.
Expand All @@ -535,13 +535,13 @@ func (c *client) sendInternalMsg(subj, rply string, si *ServerInfo, msg interfac
if s == nil {
return
}
s.mu.Lock()
s.mu.RLock()
if s.sys == nil || s.sys.sendq == nil {
s.mu.Unlock()
s.mu.RUnlock()
return
}
s.sys.sendq.push(newPubMsg(c, subj, rply, si, nil, msg, noCompression, false, false))
s.mu.Unlock()
s.mu.RUnlock()
}

// Locked version of checking if events system running. Also checks server.
Expand Down
39 changes: 14 additions & 25 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2533,36 +2533,25 @@ func (s *Server) jsLeaderAccountPurgeRequest(sub *subscription, c *client, _ *Ac
return
}

rm := recoveryRemovals{
streams: make(map[string]*streamAssignment),
consumers: make(map[string]*consumerAssignment),
}

js.mu.Lock()
js.mu.RLock()
ns, nc := 0, 0
streams, hasAccount := cc.streams[accName]
if hasAccount {
for _, s := range streams {
key := accName + ":" + s.Config.Name
rm.streams[key] = s.copyGroup()
for _, c := range s.consumers {
key := accName + ":" + s.Config.Name + ":" + c.Config.Durable
rm.consumers[key] = c.copyGroup()
}
}
for _, osa := range streams {
for _, oca := range osa.consumers {
oca.deleted = true
ca := &consumerAssignment{Group: oca.Group, Stream: oca.Stream, Name: oca.Name, Config: oca.Config, Subject: subject, Client: oca.Client}
cc.meta.Propose(encodeDeleteConsumerAssignment(ca))
nc++
}
sa := &streamAssignment{Group: osa.Group, Config: osa.Config, Subject: subject, Client: osa.Client}
cc.meta.Propose(encodeDeleteStreamAssignment(sa))
ns++
}
js.mu.Unlock()
js.mu.RUnlock()

s.Noticef("Purge request for account %s (streams: %d, consumer: %d, hasAccount: %t)",
accName, len(rm.streams), len(rm.consumers), hasAccount)
s.Noticef("Purge request for account %s (streams: %d, consumer: %d, hasAccount: %t)", accName, ns, nc, hasAccount)

for _, ca := range rm.consumers {
cc.meta.Propose(encodeDeleteConsumerAssignment(ca))
}
for _, sa := range rm.streams {
cc.meta.Propose(encodeDeleteStreamAssignment(sa))
}
resp.Initiated = true

s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
}

Expand Down
116 changes: 88 additions & 28 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -6399,11 +6399,11 @@ func (mset *stream) calculateSyncRequest(state *StreamState, snap *streamSnapsho
func (mset *stream) processSnapshotDeletes(snap *streamSnapshot) {
state := mset.state()

// Adjust if FirstSeq has moved.
if snap.FirstSeq > state.FirstSeq && state.FirstSeq != 0 {
// Always adjust if FirstSeq has moved beyond our state.
if snap.FirstSeq > state.FirstSeq {
mset.store.Compact(snap.FirstSeq)
state = mset.store.State()
mset.setLastSeq(snap.LastSeq)
mset.setLastSeq(state.LastSeq)
}
// Range the deleted and delete if applicable.
for _, dseq := range snap.Deleted {
Expand Down Expand Up @@ -6536,11 +6536,10 @@ func (mset *stream) processSnapshot(snap *streamSnapshot) (e error) {
var ErrStreamStopped = errors.New("stream has been stopped")

defer func() {
if e == ErrServerNotRunning || e == ErrStreamStopped {
// Wipe our raft state if exiting with these errors.
n.Wipe()
// Don't bother resuming if server or stream is gone.
if e != ErrStreamStopped && e != ErrServerNotRunning {
n.ResumeApply()
}
n.ResumeApply()
}()

// Set our catchup state.
Expand All @@ -6550,24 +6549,10 @@ func (mset *stream) processSnapshot(snap *streamSnapshot) (e error) {
var sub *subscription
var err error

const maxActivityInterval = 10 * time.Second
const minActivityInterval = time.Second
activityInterval := minActivityInterval
const activityInterval = 10 * time.Second
notActive := time.NewTimer(activityInterval)
defer notActive.Stop()

var gotMsgs bool
getActivityInterval := func() time.Duration {
if gotMsgs || activityInterval == maxActivityInterval {
return maxActivityInterval
}
activityInterval *= 2
if activityInterval > maxActivityInterval {
activityInterval = maxActivityInterval
}
return activityInterval
}

defer func() {
if sub != nil {
s.sysUnsubscribe(sub)
Expand Down Expand Up @@ -6599,6 +6584,33 @@ func (mset *stream) processSnapshot(snap *streamSnapshot) (e error) {
// On exit, we will release our semaphore if we acquired it.
defer releaseSyncOutSem()

// Check our final state when we exit cleanly.
// If this snapshot was for messages no longer held by the leader we want to make sure
// we are synched for the next message sequence properly.
lastRequested := sreq.LastSeq
checkFinalState := func() {
if mset != nil {
mset.mu.Lock()
var state StreamState
mset.store.FastState(&state)
var didReset bool
firstExpected := lastRequested + 1
if state.FirstSeq != firstExpected {
// Reset our notion of first.
mset.store.Compact(firstExpected)
mset.store.FastState(&state)
// Make sure last is also correct in case this also moved.
mset.lseq = state.LastSeq
didReset = true
}
mset.mu.Unlock()
if didReset {
s.Warnf("Catchup for stream '%s > %s' resetting first sequence: %d on catchup complete",
mset.account(), mset.name(), firstExpected)
}
}
}

RETRY:
// On retry, we need to release the semaphore we got. Call will be no-op
// if releaseSem boolean has not been set to true on successfully getting
Expand Down Expand Up @@ -6630,7 +6642,7 @@ RETRY:
default:
}
}
notActive.Reset(getActivityInterval())
notActive.Reset(activityInterval)

// Grab sync request again on failures.
if sreq == nil {
Expand All @@ -6642,6 +6654,8 @@ RETRY:
if sreq == nil {
return nil
}
// Reset notion of lastRequested
lastRequested = sreq.LastSeq
}

// Used to transfer message from the wire to another Go routine internally.
Expand All @@ -6665,8 +6679,11 @@ RETRY:
err = nil
goto RETRY
}
// Send our sync request.
b, _ := json.Marshal(sreq)
s.sendInternalMsgLocked(subject, reply, nil, b)
// Remember when we sent this out to avoimd loop spins on errors below.
reqSendTime := time.Now()

// Clear our sync request and capture last.
last := sreq.LastSeq
Expand All @@ -6676,17 +6693,26 @@ RETRY:
for qch, lch := n.QuitC(), n.LeadChangeC(); ; {
select {
case <-msgsQ.ch:
gotMsgs = true
notActive.Reset(getActivityInterval())
notActive.Reset(activityInterval)

mrecs := msgsQ.pop()

// Send acks first for longer RTT situations.
for _, mreci := range mrecs {
Comment thread
derekcollison marked this conversation as resolved.
mrec := mreci.(*im)
if mrec.reply != _EMPTY_ {
s.sendInternalMsgLocked(mrec.reply, _EMPTY_, nil, nil)
}
}

for _, mreci := range mrecs {
mrec := mreci.(*im)
msg := mrec.msg

// Check for eof signaling.
if len(msg) == 0 {
msgsQ.recycle(&mrecs)
checkFinalState()
return nil
}
if lseq, err := mset.processCatchupMsg(msg); err == nil {
Expand All @@ -6707,11 +6733,21 @@ RETRY:
} else {
s.Warnf("Catchup for stream '%s > %s' errored, will retry: %v", mset.account(), mset.name(), err)
msgsQ.recycle(&mrecs)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that in any condition where we return an error or we goto RETRY, we should use the current mrec's reply to send the error back to the leader. The leader does not care of the content at the moment, so it would be backward compatible, but in runCatchup() we could have the sub check that if the FC reply's body is not empty, we know that the remote has stopped the catchup and the body content is the error message.
We would need to coordinate with that sub and the rest of the go routine loop, but that is perfectly doable. This would help by "releasing" resources on the runCatchup because otherwise that routine will wait up to 5 seconds without activity from the remote before returning (which then releases the outb that counts toward the server-wide limit if I am not mistaken).

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that idea, once I free up will take a look.

// Make sure we do not spin and make things worse.
const minRetryWait = 2 * time.Second
elapsed := time.Since(reqSendTime)
if elapsed < minRetryWait {
select {
case <-s.quitCh:
return ErrServerNotRunning
case <-qch:
return ErrStreamStopped
case <-time.After(minRetryWait - elapsed):
}
}
goto RETRY
}
if mrec.reply != _EMPTY_ {
s.sendInternalMsgLocked(mrec.reply, _EMPTY_, nil, nil)
}
}
msgsQ.recycle(&mrecs)
case <-notActive.C:
Expand Down Expand Up @@ -7087,6 +7123,20 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
notActive := time.NewTimer(activityInterval)
defer notActive.Stop()

// Grab our state.
var state StreamState
mset.mu.RLock()
mset.store.FastState(&state)
mset.mu.RUnlock()

// Reset notion of first if this request wants sequences before our starting sequence
// and we would have nothing to send. If we have partial messages still need to send skips for those.
if sreq.FirstSeq < state.FirstSeq && state.FirstSeq > sreq.LastSeq {
s.Debugf("Catchup for stream '%s > %s' resetting request first sequence from %d to %d",
mset.account(), mset.name(), sreq.FirstSeq, state.FirstSeq)
sreq.FirstSeq = state.FirstSeq
}

// Setup sequences to walk through.
seq, last := sreq.FirstSeq, sreq.LastSeq
mset.setCatchupPeer(sreq.Peer, last-seq)
Expand All @@ -7096,7 +7146,16 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
// Update our activity timer.
notActive.Reset(activityInterval)

// Check if we know we will not enter the loop because we are done.
if seq > last {
s.Noticef("Catchup for stream '%s > %s' complete", mset.account(), mset.name())
// EOF
s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, nil)
return false
}

var smv StoreMsg

for ; seq <= last && atomic.LoadInt64(&outb) <= maxOutBytes && atomic.LoadInt32(&outm) <= maxOutMsgs && s.gcbTotal() <= maxTotalCatchupOutBytes; seq++ {
sm, err := mset.store.LoadMsg(seq, &smv)
// if this is not a deleted msg, bail out.
Expand Down Expand Up @@ -7130,6 +7189,7 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
// Skip record for deleted msg.
em = encodeStreamMsg(_EMPTY_, _EMPTY_, nil, nil, seq, 0)
}

// Place size in reply subject for flow control.
l := int64(len(em))
reply := fmt.Sprintf(ackReplyT, l)
Expand Down
Loading