diff --git a/server/client.go b/server/client.go index 52360059d87..fef19dd5285 100644 --- a/server/client.go +++ b/server/client.go @@ -4684,19 +4684,32 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, continue } // Remember that leaf in case we don't find any other candidate. + // We already start randomly in lqs slice, so we don't need + // to do a random swap if we already have an rsub like we do + // when src == ROUTER above. if rsub == nil { rsub = sub } continue } else { - // We would be picking a route, but if we had remembered a "hub" leaf, - // then pick that one instead of the route. - if rsub != nil && rsub.client.kind == LEAF && rsub.client.isHubLeafNode() { - break + // We want to favor qsubs in our own cluster. If the routed + // qsub has an origin, it means that is on behalf of a leaf. + // We need to treat it differently. + if len(sub.origin) > 0 { + // If we already have an rsub, nothing to do. Also, do + // not pick a routed qsub for a LEAF origin cluster + // that is the same than where the message comes from. + if rsub == nil && (leafOrigin == _EMPTY_ || leafOrigin != bytesToString(sub.origin)) { + rsub = sub + } + continue } + // This is a qsub that is local on the remote server (or + // we are connected to an older server and we don't know). + // Pick this one and be done. rsub = sub + break } - break } // Assume delivery subject is normal subject to this point. diff --git a/server/const.go b/server/const.go index b54773239a6..97d0a03ecd2 100644 --- a/server/const.go +++ b/server/const.go @@ -171,6 +171,9 @@ const ( // MAX_HPUB_ARGS Maximum possible number of arguments from HPUB proto. MAX_HPUB_ARGS = 4 + // MAX_RSUB_ARGS Maximum possible number of arguments from a RS+/LS+ proto. + MAX_RSUB_ARGS = 6 + // DEFAULT_MAX_CLOSED_CLIENTS is the maximum number of closed connections we hold onto. DEFAULT_MAX_CLOSED_CLIENTS = 10000 diff --git a/server/consumer.go b/server/consumer.go index 7c1a04855c5..ebe97f65b59 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -505,7 +505,7 @@ func checkConsumerCfg( } // Check if we have a BackOff defined that MaxDeliver is within range etc. - if lbo := len(config.BackOff); lbo > 0 && config.MaxDeliver != -1 && config.MaxDeliver <= lbo { + if lbo := len(config.BackOff); lbo > 0 && config.MaxDeliver != -1 && lbo > config.MaxDeliver { return NewJSConsumerMaxDeliverBackoffError() } @@ -1843,7 +1843,7 @@ func (acc *Account) checkNewConsumerConfig(cfg, ncfg *ConsumerConfig) error { } // Check if BackOff is defined, MaxDeliver is within range. - if lbo := len(ncfg.BackOff); lbo > 0 && ncfg.MaxDeliver != -1 && ncfg.MaxDeliver <= lbo { + if lbo := len(ncfg.BackOff); lbo > 0 && ncfg.MaxDeliver != -1 && lbo > ncfg.MaxDeliver { return NewJSConsumerMaxDeliverBackoffError() } @@ -2206,9 +2206,7 @@ func (o *consumer) updateDelivered(dseq, sseq, dc uint64, ts int64) { n += binary.PutUvarint(b[n:], dc) n += binary.PutVarint(b[n:], ts) o.propose(b[:n]) - } - if o.store != nil { - // Update local state always. + } else if o.store != nil { o.store.UpdateDelivered(dseq, sseq, dc, ts) } // Update activity. @@ -3839,7 +3837,7 @@ func (o *consumer) checkAckFloor() { // We will set it explicitly to 1 behind our current lowest in pending, or if // pending is empty, to our current delivered -1. const minOffThreshold = 50 - if o.asflr < ss.FirstSeq-minOffThreshold { + if ss.FirstSeq >= minOffThreshold && o.asflr < ss.FirstSeq-minOffThreshold { var psseq, pdseq uint64 for seq, p := range o.pending { if psseq == 0 || seq < psseq { @@ -5245,12 +5243,6 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error { if dflag { n.Delete() } else { - // Try to install snapshot on clean exit - if o.store != nil && (o.retention != LimitsPolicy || n.NeedSnapshot()) { - if snap, err := o.store.EncodedState(); err == nil { - n.InstallSnapshot(snap) - } - } n.Stop() } } @@ -5595,8 +5587,9 @@ func (o *consumer) checkStateForInterestStream(ss *StreamState) error { o.mu.Lock() // Update our check floor. - if seq > o.chkflr { - o.chkflr = seq + // Check floor must never be greater than ack floor+1, otherwise subsequent calls to this function would skip work. + if asflr+1 > o.chkflr { + o.chkflr = asflr + 1 } // See if we need to process this update if our parent stream is not a limits policy stream. state, _ = o.store.State() diff --git a/server/filestore.go b/server/filestore.go index 99e8c2d179b..c17a4795e67 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -9320,14 +9320,6 @@ func (o *consumerFileStore) UpdateConfig(cfg *ConsumerConfig) error { } func (o *consumerFileStore) Update(state *ConsumerState) error { - o.mu.Lock() - defer o.mu.Unlock() - - // Check to see if this is an outdated update. - if state.Delivered.Consumer < o.state.Delivered.Consumer || state.AckFloor.Stream < o.state.AckFloor.Stream { - return nil - } - // Sanity checks. if state.AckFloor.Consumer > state.Delivered.Consumer { return fmt.Errorf("bad ack floor for consumer") @@ -9355,6 +9347,15 @@ func (o *consumerFileStore) Update(state *ConsumerState) error { } } + // Replace our state. + o.mu.Lock() + defer o.mu.Unlock() + + // Check to see if this is an outdated update. + if state.Delivered.Consumer < o.state.Delivered.Consumer || state.AckFloor.Stream < o.state.AckFloor.Stream { + return fmt.Errorf("old update ignored") + } + o.state.Delivered = state.Delivered o.state.AckFloor = state.AckFloor o.state.Pending = pending diff --git a/server/gateway.go b/server/gateway.go index 82df196e2f1..46dd7260ec7 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -1900,7 +1900,7 @@ func (c *client) processGatewayAccountSub(accName string) error { // the sublist if present. // func (c *client) processGatewayRUnsub(arg []byte) error { - accName, subject, queue, err := c.parseUnsubProto(arg) + _, accName, subject, queue, err := c.parseUnsubProto(arg, true, false) if err != nil { return fmt.Errorf("processGatewaySubjectUnsub %s", err.Error()) } diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index ebc5c7bc801..1ad3fa52f14 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2405,7 +2405,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps // fully recovered from disk. isRecovering := true - // Should only to be called from leader. doSnapshot := func() { if mset == nil || isRecovering || isRestore || time.Since(lastSnapTime) < minSnapDelta { return @@ -4939,23 +4938,13 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { } // Process the change. - if err := js.processConsumerLeaderChange(o, isLeader); err == nil && isLeader { + if err := js.processConsumerLeaderChange(o, isLeader); err == nil { // Check our state if we are under an interest based stream. if mset := o.getStream(); mset != nil { var ss StreamState mset.store.FastState(&ss) o.checkStateForInterestStream(&ss) } - // Do a snapshot. - doSnapshot(true) - // Synchronize followers to our state. Only send out if we have state and nothing pending. - if n != nil { - if _, _, applied := n.Progress(); applied > 0 && aq.len() == 0 { - if snap, err := o.store.EncodedState(); err == nil { - n.SendSnapshot(snap) - } - } - } } // We may receive a leader change after the consumer assignment which would cancel us @@ -5110,25 +5099,22 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea buf := e.Data switch entryOp(buf[0]) { case updateDeliveredOp: - // These are handled in place in leaders. - if !isLeader { - dseq, sseq, dc, ts, err := decodeDeliveredUpdate(buf[1:]) - if err != nil { - if mset, node := o.streamAndNode(); mset != nil && node != nil { - s := js.srv - s.Errorf("JetStream cluster could not decode consumer delivered update for '%s > %s > %s' [%s]", - mset.account(), mset.name(), o, node.Group()) - } - panic(err.Error()) - } - // Make sure to update delivered under the lock. - o.mu.Lock() - err = o.store.UpdateDelivered(dseq, sseq, dc, ts) - o.ldt = time.Now() - o.mu.Unlock() - if err != nil { - panic(err.Error()) + dseq, sseq, dc, ts, err := decodeDeliveredUpdate(buf[1:]) + if err != nil { + if mset, node := o.streamAndNode(); mset != nil && node != nil { + s := js.srv + s.Errorf("JetStream cluster could not decode consumer delivered update for '%s > %s > %s' [%s]", + mset.account(), mset.name(), o, node.Group()) } + panic(err.Error()) + } + // Make sure to update delivered under the lock. + o.mu.Lock() + err = o.store.UpdateDelivered(dseq, sseq, dc, ts) + o.ldt = time.Now() + o.mu.Unlock() + if err != nil { + panic(err.Error()) } case updateAcksOp: dseq, sseq, err := decodeAckUpdate(buf[1:]) diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 5499881c091..cfd7596a763 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -3543,7 +3543,7 @@ func TestJetStreamClusterNoR1AssetsDuringLameDuck(t *testing.T) { s.WaitForShutdown() } -// If a consumer has not been registered (possible in heavily loaded systems with lots of assets) +// If a consumer has not been registered (possible in heavily loaded systems with lots of assets) // it could miss the signal of a message going away. If that message was pending and expires the // ack floor could fall below the stream first sequence. This test will force that condition and // make sure the system resolves itself. @@ -3566,7 +3566,9 @@ func TestJetStreamClusterConsumerAckFloorDrift(t *testing.T) { sub, err := js.PullSubscribe("foo", "C") require_NoError(t, err) - for i := 0; i < 10; i++ { + // Publish as many messages as the ack floor check threshold +5. + totalMessages := 55 + for i := 0; i < totalMessages; i++ { sendStreamMsg(t, nc, "foo", "HELLO") } @@ -3610,10 +3612,9 @@ func TestJetStreamClusterConsumerAckFloorDrift(t *testing.T) { o := mset.lookupConsumer("C") require_NotNil(t, o) o.mu.Lock() - err = o.setStoreState(state) + o.applyState(state) cfs := o.store.(*consumerFileStore) o.mu.Unlock() - require_NoError(t, err) // The lower layer will ignore, so set more directly. cfs.mu.Lock() cfs.state = *state @@ -3631,10 +3632,10 @@ func TestJetStreamClusterConsumerAckFloorDrift(t *testing.T) { ci, err := js.ConsumerInfo("TEST", "C") require_NoError(t, err) // Make sure we catch this and adjust. - if ci.AckFloor.Stream == 10 && ci.AckFloor.Consumer == 10 { + if ci.AckFloor.Stream == uint64(totalMessages) && ci.AckFloor.Consumer == 10 { return nil } - return fmt.Errorf("AckFloor not correct, expected 10, got %+v", ci.AckFloor) + return fmt.Errorf("AckFloor not correct, expected %d, got %+v", totalMessages, ci.AckFloor) }) } diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index ce17ddf51b2..a8d5505c68e 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -18,6 +18,7 @@ package server import ( "context" + "encoding/binary" "encoding/json" "errors" "fmt" @@ -3726,7 +3727,9 @@ func TestJetStreamClusterDesyncAfterErrorDuringCatchup(t *testing.T) { for _, n := range server.raftNodes { rn := n.(*raft) if rn.accName == "$G" { + rn.Lock() rn.updateLeader(noLeader) + rn.Unlock() } } @@ -3990,3 +3993,315 @@ func TestJetStreamClusterMetaSnapshotMustNotIncludePendingConsumers(t *testing.T } } } + +func TestJetStreamClusterConsumerDontSendSnapshotOnLeaderChange(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "CONSUMER", + Replicas: 3, + AckPolicy: nats.AckExplicitPolicy, + }) + require_NoError(t, err) + + // Add a message and let the consumer ack it, this moves the consumer's RAFT applied up to 1. + _, err = js.Publish("foo", nil) + require_NoError(t, err) + sub, err := js.PullSubscribe("foo", "CONSUMER") + require_NoError(t, err) + msgs, err := sub.Fetch(1) + require_NoError(t, err) + require_Len(t, len(msgs), 1) + err = msgs[0].AckSync() + require_NoError(t, err) + + // We don't need the client anymore. + nc.Close() + + lookupConsumer := func(s *Server) *consumer { + t.Helper() + mset, err := s.lookupAccount(globalAccountName) + require_NoError(t, err) + acc, err := mset.lookupStream("TEST") + require_NoError(t, err) + o := acc.lookupConsumer("CONSUMER") + require_NotNil(t, o) + return o + } + + // Grab current consumer leader before moving all into observer mode. + cl := c.consumerLeader(globalAccountName, "TEST", "CONSUMER") + for _, s := range c.servers { + // Put all consumer's RAFT into observer mode, this will prevent all servers from trying to become leader. + o := lookupConsumer(s) + o.node.SetObserver(true) + if s != cl { + // For all followers, pause apply so they only store messages in WAL but not apply and possibly snapshot. + err = o.node.PauseApply() + require_NoError(t, err) + } + } + + updateDeliveredBuffer := func() []byte { + var b [4*binary.MaxVarintLen64 + 1]byte + b[0] = byte(updateDeliveredOp) + n := 1 + n += binary.PutUvarint(b[n:], 100) + n += binary.PutUvarint(b[n:], 100) + n += binary.PutUvarint(b[n:], 1) + n += binary.PutVarint(b[n:], time.Now().UnixNano()) + return b[:n] + } + + updateAcksBuffer := func() []byte { + var b [2*binary.MaxVarintLen64 + 1]byte + b[0] = byte(updateAcksOp) + n := 1 + n += binary.PutUvarint(b[n:], 100) + n += binary.PutUvarint(b[n:], 100) + return b[:n] + } + + // Store an uncommitted entry into our WAL, which will be committed and applied later. + co := lookupConsumer(cl) + rn := co.node.(*raft) + rn.Lock() + entries := []*Entry{{EntryNormal, updateDeliveredBuffer()}, {EntryNormal, updateAcksBuffer()}} + ae := encode(t, rn.buildAppendEntry(entries)) + err = rn.storeToWAL(ae) + minPindex := rn.pindex + rn.Unlock() + require_NoError(t, err) + + // Simulate leader change, we do this so we can check what happens in the upper layer logic. + rn.leadc <- true + rn.SetObserver(false) + + // Since upper layer is async, we don't know whether it will or will not act on the leader change. + // Wait for some time to check if it does. + time.Sleep(2 * time.Second) + rn.RLock() + maxPindex := rn.pindex + rn.RUnlock() + + r := c.randomNonConsumerLeader(globalAccountName, "TEST", "CONSUMER") + ro := lookupConsumer(r) + rn = ro.node.(*raft) + + checkFor(t, 5*time.Second, time.Second, func() error { + rn.RLock() + defer rn.RUnlock() + if rn.pindex < maxPindex { + return fmt.Errorf("rn.pindex too low, expected %d, got %d", maxPindex, rn.pindex) + } + return nil + }) + + // We should only have 'Normal' entries. + // If we'd get a 'Snapshot' entry, that would mean it had incomplete state and would be reverting committed state. + var state StreamState + rn.wal.FastState(&state) + for seq := minPindex; seq <= maxPindex; seq++ { + ae, err = rn.loadEntry(seq) + require_NoError(t, err) + for _, entry := range ae.entries { + require_Equal(t, entry.Type, EntryNormal) + } + } +} + +func TestJetStreamClusterDontInstallSnapshotWhenStoppingStream(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Retention: nats.WorkQueuePolicy, + Replicas: 3, + }) + require_NoError(t, err) + + _, err = js.Publish("foo", nil) + require_NoError(t, err) + + // Wait for all servers to have applied everything. + var maxApplied uint64 + checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { + maxApplied = 0 + for _, s := range c.servers { + acc, err := s.lookupAccount(globalAccountName) + if err != nil { + return err + } + mset, err := acc.lookupStream("TEST") + if err != nil { + return err + } + _, _, applied := mset.node.Progress() + if maxApplied == 0 { + maxApplied = applied + } else if applied < maxApplied { + return fmt.Errorf("applied not high enough, expected %d, got %d", applied, maxApplied) + } else if applied > maxApplied { + return fmt.Errorf("applied higher on one server, expected %d, got %d", applied, maxApplied) + } + } + return nil + }) + + // Install a snapshot on a follower. + s := c.randomNonStreamLeader(globalAccountName, "TEST") + acc, err := s.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + err = mset.node.InstallSnapshot(mset.stateSnapshotLocked()) + require_NoError(t, err) + + // Validate the snapshot reflects applied. + validateStreamState := func(snap *snapshot) { + t.Helper() + require_Equal(t, snap.lastIndex, maxApplied) + ss, err := DecodeStreamState(snap.data) + require_NoError(t, err) + require_Equal(t, ss.FirstSeq, 1) + require_Equal(t, ss.LastSeq, 1) + } + snap, err := mset.node.(*raft).loadLastSnapshot() + require_NoError(t, err) + validateStreamState(snap) + + // Simulate a message being stored, but not calling Applied yet. + err = mset.processJetStreamMsg("foo", _EMPTY_, nil, nil, 1, time.Now().UnixNano()) + require_NoError(t, err) + + // Simulate the stream being stopped before we're able to call Applied. + // If we'd install a snapshot during this, which would be a race condition, + // we'd store a snapshot with state that's ahead of applied. + err = mset.stop(false, false) + require_NoError(t, err) + + // Validate the snapshot is the same as before. + snap, err = mset.node.(*raft).loadLastSnapshot() + require_NoError(t, err) + validateStreamState(snap) +} + +func TestJetStreamClusterDontInstallSnapshotWhenStoppingConsumer(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Retention: nats.WorkQueuePolicy, + Replicas: 3, + }) + require_NoError(t, err) + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "CONSUMER", + Replicas: 3, + AckPolicy: nats.AckExplicitPolicy, + }) + require_NoError(t, err) + + // Add a message and let the consumer ack it, this moves the consumer's RAFT applied up. + _, err = js.Publish("foo", nil) + require_NoError(t, err) + sub, err := js.PullSubscribe("foo", "CONSUMER") + require_NoError(t, err) + msgs, err := sub.Fetch(1) + require_NoError(t, err) + require_Len(t, len(msgs), 1) + err = msgs[0].AckSync() + require_NoError(t, err) + + // Wait for all servers to have applied everything. + var maxApplied uint64 + checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { + maxApplied = 0 + for _, s := range c.servers { + acc, err := s.lookupAccount(globalAccountName) + if err != nil { + return err + } + mset, err := acc.lookupStream("TEST") + if err != nil { + return err + } + o := mset.lookupConsumer("CONSUMER") + if o == nil { + return errors.New("consumer not found") + } + _, _, applied := o.node.Progress() + if maxApplied == 0 { + maxApplied = applied + } else if applied < maxApplied { + return fmt.Errorf("applied not high enough, expected %d, got %d", applied, maxApplied) + } else if applied > maxApplied { + return fmt.Errorf("applied higher on one server, expected %d, got %d", applied, maxApplied) + } + } + return nil + }) + + // Install a snapshot on a follower. + s := c.randomNonStreamLeader(globalAccountName, "TEST") + acc, err := s.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + o := mset.lookupConsumer("CONSUMER") + require_NotNil(t, o) + snapBytes, err := o.store.EncodedState() + require_NoError(t, err) + err = o.node.InstallSnapshot(snapBytes) + require_NoError(t, err) + + // Validate the snapshot reflects applied. + validateStreamState := func(snap *snapshot) { + t.Helper() + require_Equal(t, snap.lastIndex, maxApplied) + state, err := decodeConsumerState(snap.data) + require_NoError(t, err) + require_Equal(t, state.Delivered.Consumer, 1) + require_Equal(t, state.Delivered.Stream, 1) + } + snap, err := o.node.(*raft).loadLastSnapshot() + require_NoError(t, err) + validateStreamState(snap) + + // Simulate a message being delivered, but not calling Applied yet. + err = o.store.UpdateDelivered(2, 2, 1, time.Now().UnixNano()) + require_NoError(t, err) + + // Simulate the consumer being stopped before we're able to call Applied. + // If we'd install a snapshot during this, which would be a race condition, + // we'd store a snapshot with state that's ahead of applied. + err = o.stop() + require_NoError(t, err) + + // Validate the snapshot is the same as before. + snap, err = o.node.(*raft).loadLastSnapshot() + require_NoError(t, err) + validateStreamState(snap) +} diff --git a/server/jetstream_consumer_test.go b/server/jetstream_consumer_test.go index 7ecdc454999..8e62dbb12a0 100644 --- a/server/jetstream_consumer_test.go +++ b/server/jetstream_consumer_test.go @@ -959,6 +959,22 @@ func TestJetStreamConsumerBackOff(t *testing.T) { }, shouldErr: false, }, + { + name: "backoff_with_max_deliver_equal", + config: nats.ConsumerConfig{ + MaxDeliver: 3, + BackOff: []time.Duration{time.Second, time.Minute, time.Hour}, + }, + shouldErr: false, + }, + { + name: "backoff_with_max_deliver_equal_to_zero", + config: nats.ConsumerConfig{ + MaxDeliver: 0, + BackOff: []time.Duration{}, + }, + shouldErr: false, + }, { name: "backoff_with_max_deliver_smaller", config: nats.ConsumerConfig{ @@ -1469,3 +1485,44 @@ func TestJetStreamConsumerBackoffNotRespectedWithMultipleInflightRedeliveries(t } } } + +func TestJetStreamConsumerBackoffWhenBackoffLengthIsEqualToMaxDeliverConfig(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"events.>"}, + }) + require_NoError(t, err) + + maxDeliver := 3 + backoff := []time.Duration{time.Second, 2 * time.Second, 3 * time.Second} + sub, err := js.SubscribeSync( + "events.>", + nats.MaxDeliver(maxDeliver), + nats.BackOff(backoff), + nats.AckExplicit(), + ) + require_NoError(t, err) + + calculateExpectedBackoff := func(numDelivered int) time.Duration { + return backoff[numDelivered-1] + 50*time.Millisecond // 50ms of margin to system overhead + } + + // message to be redelivered using backoff duration. + firstMsgSent := time.Now() + sendStreamMsg(t, nc, "events.first", "msg-1") + _, err = sub.NextMsg(time.Second) + require_NoError(t, err) + require_LessThan(t, time.Since(firstMsgSent), calculateExpectedBackoff(1)) + _, err = sub.NextMsg(2 * time.Second) + require_NoError(t, err) + require_LessThan(t, time.Since(firstMsgSent), calculateExpectedBackoff(2)) + _, err = sub.NextMsg(3 * time.Second) + require_NoError(t, err) + require_LessThan(t, time.Since(firstMsgSent), calculateExpectedBackoff(3)) +} diff --git a/server/leafnode.go b/server/leafnode.go index e40cfcab894..e163a6a2797 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -2271,6 +2271,42 @@ func keyFromSub(sub *subscription) string { return sb.String() } +const ( + keyRoutedSub = "R" + keyRoutedSubByte = 'R' + keyRoutedLeafSub = "L" + keyRoutedLeafSubByte = 'L' +) + +// Helper function to build the key that prevents collisions between normal +// routed subscriptions and routed subscriptions on behalf of a leafnode. +// Keys will look like this: +// "R foo" -> plain routed sub on "foo" +// "R foo bar" -> queue routed sub on "foo", queue "bar" +// "L foo bar" -> plain routed leaf sub on "foo", leaf "bar" +// "L foo bar baz" -> queue routed sub on "foo", queue "bar", leaf "baz" +func keyFromSubWithOrigin(sub *subscription) string { + var sb strings.Builder + sb.Grow(2 + len(sub.origin) + 1 + len(sub.subject) + 1 + len(sub.queue)) + leaf := len(sub.origin) > 0 + if leaf { + sb.WriteByte(keyRoutedLeafSubByte) + } else { + sb.WriteByte(keyRoutedSubByte) + } + sb.WriteByte(' ') + sb.Write(sub.subject) + if sub.queue != nil { + sb.WriteByte(' ') + sb.Write(sub.queue) + } + if leaf { + sb.WriteByte(' ') + sb.Write(sub.origin) + } + return sb.String() +} + // Lock should be held. func (c *client) writeLeafSub(w *bytes.Buffer, key string, n int32) { if key == _EMPTY_ { @@ -2321,12 +2357,21 @@ func (c *client) processLeafSub(argo []byte) (err error) { args := splitArg(arg) sub := &subscription{client: c} + delta := int32(1) switch len(args) { case 1: sub.queue = nil case 3: sub.queue = args[1] sub.qw = int32(parseSize(args[2])) + // TODO: (ik) We should have a non empty queue name and a queue + // weight >= 1. For 2.11, we may want to return an error if that + // is not the case, but for now just overwrite `delta` if queue + // weight is greater than 1 (it is possible after a reconnect/ + // server restart to receive a queue weight > 1 for a new sub). + if sub.qw > 1 { + delta = sub.qw + } default: return fmt.Errorf("processLeafSub Parse Error: '%s'", arg) } @@ -2391,7 +2436,6 @@ func (c *client) processLeafSub(argo []byte) (err error) { key := bytesToString(sub.sid) osub := c.subs[key] updateGWs := false - delta := int32(1) if osub == nil { c.subs[key] = sub // Now place into the account sl. @@ -2472,6 +2516,10 @@ func (c *client) processLeafUnsub(arg []byte) error { // We store local subs by account and subject and optionally queue name. // LS- will have the arg exactly as the key. sub, ok := c.subs[string(arg)] + delta := int32(1) + if ok && len(sub.queue) > 0 { + delta = sub.qw + } c.mu.Unlock() if ok { @@ -2481,14 +2529,14 @@ func (c *client) processLeafUnsub(arg []byte) error { if !spoke { // If we are routing subtract from the route map for the associated account. - srv.updateRouteSubscriptionMap(acc, sub, -1) + srv.updateRouteSubscriptionMap(acc, sub, -delta) // Gateways if updateGWs { - srv.gatewayUpdateSubInterest(acc.Name, sub, -1) + srv.gatewayUpdateSubInterest(acc.Name, sub, -delta) } } // Now check on leafnode updates for other leaf nodes. - acc.updateLeafNodes(sub, -1) + acc.updateLeafNodes(sub, -delta) return nil } diff --git a/server/leafnode_test.go b/server/leafnode_test.go index 1bc37fff0fd..9c8bb729006 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -2470,7 +2470,7 @@ func (l *parseRouteLSUnsubLogger) Errorf(format string, v ...any) { func (l *parseRouteLSUnsubLogger) Tracef(format string, v ...any) { trace := fmt.Sprintf(format, v...) - if strings.Contains(trace, "LS- $G foo bar") { + if strings.Contains(trace, "LS- xyz $G foo bar") { l.gotTrace <- struct{}{} } } @@ -4380,7 +4380,7 @@ func TestLeafNodeQueueGroupDistributionWithDaisyChainAndGateway(t *testing.T) { checkLeafNodeConnected(t, a2) checkClusterFormed(t, a1, a2) - // Create out client connections to all servers where we may need to have + // Create our client connections to all servers where we may need to have // queue subscriptions. ncD1 := natsConnect(t, d1.ClientURL(), nats.UserInfo("user", "pwd")) defer ncD1.Close() @@ -4421,7 +4421,7 @@ func TestLeafNodeQueueGroupDistributionWithDaisyChainAndGateway(t *testing.T) { } else { nc = ncB2 } - natsPub(t, nc, subj, []byte("hello")) + natsPub(t, nc, subj, []byte(fmt.Sprintf("msg_%d", i+1))) } } @@ -4532,23 +4532,549 @@ func TestLeafNodeQueueGroupDistributionWithDaisyChainAndGateway(t *testing.T) { // Check that appropriate queue subs receive all messages. checkFor(t, 2*time.Second, 10*time.Millisecond, func() error { - // When there is (are) qsub(s) on b, then only A and B should - // get the messages. Otherwise, it should be between A and D - n := aCount.Load() - if test.b1 || test.b2 { - n += bCount.Load() - } else { - n += dCount.Load() - } + n := aCount.Load() + bCount.Load() + dCount.Load() if n == int32(total) { return nil } return fmt.Errorf("Got only %v/%v messages (a=%v b=%v d=%v)", n, total, aCount.Load(), bCount.Load(), dCount.Load()) }) - // For this specific case, make sure that D did not receive any. + // When there is (are) qsub(s) on b, then only B should + // get the messages. Otherwise, it should be between A and D if test.b1 || test.b2 { - require_LessThan(t, dCount.Load(), 1) + require_Equal(t, aCount.Load(), 0) + require_Equal(t, dCount.Load(), 0) + } else { + require_Equal(t, bCount.Load(), 0) + // We should have receive some on A and D + require_True(t, aCount.Load() > 0) + require_True(t, dCount.Load() > 0) + } + }) + } +} + +func TestLeafNodeQueueInterestAndWeightCorrectAfterServerRestartOrConnectionClose(t *testing.T) { + + // Note that this is not what a normal configuration should be. Users should + // configure each leafnode to have the URLs of both B1 and B2 so that when + // a server fails, the leaf can reconnect to the other running server. But + // we force it to be this way to demonstrate what the issue was and see that + // it is now fixed. + // + // B1 <--- route ---> B2 + // | | + // Leaf Leaf + // | | + // A1 <--- route ---> A2 + // + + for _, test := range []struct { + name string + pinnedAccount string + }{ + {"without pinned account", _EMPTY_}, + {"with pinned account", "accounts: [\"A\"]"}, + } { + t.Run(test.name, func(t *testing.T) { + leafBConf := ` + accounts { A { users: [{user:a, password: pwd}] } } + server_name: %s + listen: "127.0.0.1:-1" + cluster { + name: HUB + listen: "127.0.0.1:-1" + %s + %s + } + leafnodes { + listen: "127.0.0.1:-1" + no_advertise: true + } + ` + b1Conf := createConfFile(t, []byte(fmt.Sprintf(leafBConf, "B1", _EMPTY_, test.pinnedAccount))) + b1, b1Opts := RunServerWithConfig(b1Conf) + defer b1.Shutdown() + + b2Conf := createConfFile(t, []byte(fmt.Sprintf(leafBConf, "B2", + fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", b1Opts.Cluster.Port), test.pinnedAccount))) + b2, b2Opts := RunServerWithConfig(b2Conf) + defer b2.Shutdown() + + checkClusterFormed(t, b1, b2) + + leafAConf := ` + accounts { A { users: [{user:a, password: pwd}] } } + server_name: %s + listen: "127.0.0.1:-1" + cluster { + name: LEAF + listen: "127.0.0.1:-1" + %s + %s + } + leafnodes { + listen: "127.0.0.1:-1" + remotes: [ + { + url: "nats://a:pwd@127.0.0.1:%d" + account: A + } + ] + no_advertise: true + } + ` + a1Conf := createConfFile(t, []byte(fmt.Sprintf(leafAConf, "A1", _EMPTY_, test.pinnedAccount, b1Opts.LeafNode.Port))) + a1, a1Opts := RunServerWithConfig(a1Conf) + defer a1.Shutdown() + + checkLeafNodeConnected(t, b1) + checkLeafNodeConnected(t, a1) + + a2Conf := createConfFile(t, []byte(fmt.Sprintf(leafAConf, "A2", + fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", a1Opts.Cluster.Port), test.pinnedAccount, b2Opts.LeafNode.Port))) + a2, _ := RunServerWithConfig(a2Conf) + defer a2.Shutdown() + + checkLeafNodeConnected(t, b2) + checkLeafNodeConnected(t, a2) + checkClusterFormed(t, a1, a2) + + // Create a client on A2 and 3 queue subs. + ncA2 := natsConnect(t, a2.ClientURL(), nats.UserInfo("a", "pwd")) + defer ncA2.Close() + + var qsubs []*nats.Subscription + for i := 0; i < 3; i++ { + qsubs = append(qsubs, natsQueueSub(t, ncA2, "foo", "queue", func(_ *nats.Msg) {})) + } + natsFlush(t, ncA2) + + subj := "foo" + checkInterest := func(expected bool) { + t.Helper() + for _, s := range []*Server{a1, a2, b1, b2} { + acc, err := s.LookupAccount("A") + require_NoError(t, err) + checkFor(t, time.Second, 100*time.Millisecond, func() error { + i := acc.Interest(subj) + if expected && i == 0 { + return fmt.Errorf("Still no interest on %q in server %q", subj, s) + } else if !expected && i > 0 { + return fmt.Errorf("Still interest on %q in server %q", subj, s) + } + return nil + }) + } } + checkInterest(true) + + // Check that Leafz from A1 (which connects to B1) has the expected sub + // interest on "foo". + checkLeafA1 := func(expected bool) { + t.Helper() + // We will wait a bit before checking Leafz since with the bug, it would + // take a bit of time after the action to reproduce the issue for the + // LS+ to be sent to the wrong cluster, or the interest to not be removed. + time.Sleep(100 * time.Millisecond) + // Now check Leafz + leafsz, err := a1.Leafz(&LeafzOptions{Subscriptions: true}) + require_NoError(t, err) + require_Equal(t, leafsz.NumLeafs, 1) + require_True(t, leafsz.Leafs[0] != nil) + lz := leafsz.Leafs[0] + require_Equal(t, lz.Name, "B1") + require_Equal(t, lz.NumSubs, uint32(len(lz.Subs))) + var ok bool + for _, sub := range lz.Subs { + if sub == "foo" { + if expected { + ok = true + break + } + t.Fatalf("Did not expect to have the %q subscription", sub) + } + } + if expected && !ok { + t.Fatalf("Expected to have the %q subscription", "foo") + } + } + checkLeafA1(false) + + // Now restart server "B1". We need to create a conf file with the ports + // that it used. + restartBConf := createConfFile(t, []byte(fmt.Sprintf(` + accounts { A { users: [{user:a, password: pwd}] } } + server_name: B1 + listen: "127.0.0.1:%d" + cluster { + name: HUB + listen: "127.0.0.1:%d" + %s + } + leafnodes { + listen: "127.0.0.1:%d" + no_advertise: true + } + `, b1Opts.Port, b1Opts.Cluster.Port, test.pinnedAccount, b1Opts.LeafNode.Port))) + b1.Shutdown() + b1, _ = RunServerWithConfig(restartBConf) + defer b1.Shutdown() + + checkLeafNodeConnected(t, b1) + checkLeafNodeConnected(t, a1) + + // Stop one of the queue sub. + qsubs[0].Unsubscribe() + natsFlush(t, ncA2) + + // Check that "foo" does not show up in the subscription list + // for the leaf from A1 to B1. + checkLeafA1(false) + + // Now stop the other 2 and check again. + qsubs[1].Unsubscribe() + qsubs[2].Unsubscribe() + natsFlush(t, ncA2) + checkInterest(false) + + checkLeafA1(false) + + // Now recreate 3 queue subs. + for i := 0; i < 3; i++ { + natsQueueSub(t, ncA2, "foo", "queue", func(_ *nats.Msg) {}) + } + // Check interest is present in all servers + checkInterest(true) + // But A1's leaf to B1 should still not have a sub interest for "foo". + checkLeafA1(false) + + // Now stop the client connection instead of removing queue sub + // one at a time. This will ensure that we properly handle an LS- + // on B2 with an interest with a queue weight more than 1 still + // present at the time of processing. + ncA2.Close() + checkInterest(false) + + checkLeafA1(false) + + // We will now test that if the queue subs are created on B2, + // we have proper interest on A1, but when we close the connection, + // the interest disappears. + ncB2 := natsConnect(t, b2.ClientURL(), nats.UserInfo("a", "pwd")) + defer ncB2.Close() + + for i := 0; i < 3; i++ { + natsQueueSub(t, ncB2, "foo", "queue", func(_ *nats.Msg) {}) + } + checkInterest(true) + checkLeafA1(true) + // Close the connection, so all queue subs should be removed at once. + ncB2.Close() + checkInterest(false) + checkLeafA1(false) + }) + } +} + +func TestLeafNodeQueueWeightCorrectOnRestart(t *testing.T) { + leafBConf := ` + server_name: %s + listen: "127.0.0.1:-1" + cluster { + name: HUB + listen: "127.0.0.1:-1" + %s + } + leafnodes { + listen: "127.0.0.1:-1" + no_advertise: true + } + ` + b1Conf := createConfFile(t, []byte(fmt.Sprintf(leafBConf, "B1", _EMPTY_))) + b1, b1Opts := RunServerWithConfig(b1Conf) + defer b1.Shutdown() + + b2Conf := createConfFile(t, []byte(fmt.Sprintf(leafBConf, "B2", + fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", b1Opts.Cluster.Port)))) + b2, b2Opts := RunServerWithConfig(b2Conf) + defer b2.Shutdown() + + checkClusterFormed(t, b1, b2) + + leafAConf := ` + server_name: LEAF + listen: "127.0.0.1:-1" + leafnodes { + remotes: [{url: "nats://127.0.0.1:%d"}] + reconnect: "50ms" + } + ` + aConf := createConfFile(t, []byte(fmt.Sprintf(leafAConf, b2Opts.LeafNode.Port))) + a, _ := RunServerWithConfig(aConf) + defer a.Shutdown() + + checkLeafNodeConnected(t, b2) + checkLeafNodeConnected(t, a) + + nc := natsConnect(t, a.ClientURL()) + defer nc.Close() + + for i := 0; i < 2; i++ { + natsQueueSubSync(t, nc, "foo", "queue") + } + natsFlush(t, nc) + + checkQueueWeight := func() { + for _, s := range []*Server{b1, b2} { + gacc := s.GlobalAccount() + gacc.mu.RLock() + sl := gacc.sl + gacc.mu.RUnlock() + checkFor(t, time.Second, 10*time.Millisecond, func() error { + // For remote queue interest, Match() will expand to queue weight. + // So we should have 1 group and 2 queue subs present. + res := sl.Match("foo") + for _, qsubs := range res.qsubs { + for _, sub := range qsubs { + if string(sub.subject) == "foo" && string(sub.queue) == "queue" && atomic.LoadInt32(&sub.qw) == 2 { + return nil + } + } + } + return fmt.Errorf("Server %q does not have expected queue interest with expected weight", s) + }) + } + } + checkQueueWeight() + + // Now restart server "B2". We need to create a conf file with the ports + // that it used. + restartBConf := createConfFile(t, []byte(fmt.Sprintf(` + server_name: B2 + listen: "127.0.0.1:%d" + cluster { + name: HUB + listen: "127.0.0.1:%d" + %s + } + leafnodes { + listen: "127.0.0.1:%d" + no_advertise: true + } + `, b2Opts.Port, b2Opts.Cluster.Port, fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", b1Opts.Cluster.Port), b2Opts.LeafNode.Port))) + b2.Shutdown() + b2, _ = RunServerWithConfig(restartBConf) + defer b2.Shutdown() + + checkLeafNodeConnected(t, b2) + checkLeafNodeConnected(t, a) + checkQueueWeight() +} + +func TestLeafNodeRoutedSubKeyDifferentBetweenLeafSubAndRoutedSub(t *testing.T) { + for _, test := range []struct { + name string + pinnedAccount string + lnocu bool + }{ + {"without pinned account", _EMPTY_, true}, + {"with pinned account", "accounts: [\"XYZ\"]", true}, + {"old server without pinned account", _EMPTY_, false}, + {"old server with pinned account", "accounts: [\"XYZ\"]", false}, + } { + t.Run(test.name, func(t *testing.T) { + leafBConf := ` + accounts: {XYZ {users:[{user:a, password:pwd}]}} + server_name: %s + listen: "127.0.0.1:-1" + cluster { + name: HUB + listen: "127.0.0.1:-1" + %s + %s + } + leafnodes { + listen: "127.0.0.1:-1" + no_advertise: true + } + ` + b1Conf := createConfFile(t, []byte(fmt.Sprintf(leafBConf, "B1", _EMPTY_, test.pinnedAccount))) + b1, b1Opts := RunServerWithConfig(b1Conf) + defer b1.Shutdown() + + b2Conf := createConfFile(t, []byte(fmt.Sprintf(leafBConf, "B2", + fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", b1Opts.Cluster.Port), test.pinnedAccount))) + b2, b2Opts := RunServerWithConfig(b2Conf) + defer b2.Shutdown() + + checkClusterFormed(t, b1, b2) + + // To make route connections behave like if the server was connected + // to an older server, change the routes' lnocu field. + if !test.lnocu { + for _, s := range []*Server{b1, b2} { + s.mu.RLock() + s.forEachRoute(func(r *client) { + r.mu.Lock() + r.route.lnocu = false + r.mu.Unlock() + }) + s.mu.RUnlock() + } + } + + // This leaf will have a cluster name that matches an account name. + // The idea is to make sure that hub servers are not using incorrect + // keys to differentiate a routed queue interest on subject "A" with + // queue name "foo" for account "A" in their cluster: "RS+ A A foo" + // with a leafnode plain subscription, which since there is an origin + // would be: "LS+ A A foo", that is, origin is "A", account is "A" + // and subject is "foo". + leafAConf := ` + accounts: {XYZ {users:[{user:a, password:pwd}]}} + server_name: LEAF + listen: "127.0.0.1:-1" + cluster { + name: XYZ + listen: "127.0.0.1:-1" + } + leafnodes { + remotes: [ + { + url: "nats://a:pwd@127.0.0.1:%d" + account: "XYZ" + } + ] + } + ` + aConf := createConfFile(t, []byte(fmt.Sprintf(leafAConf, b2Opts.LeafNode.Port))) + a, _ := RunServerWithConfig(aConf) + defer a.Shutdown() + + checkLeafNodeConnected(t, b2) + checkLeafNodeConnected(t, a) + + ncB2 := natsConnect(t, b2.ClientURL(), nats.UserInfo("a", "pwd")) + defer ncB2.Close() + // Create a plain sub on "foo" + natsSubSync(t, ncB2, "foo") + // And a queue sub on "XYZ" with queue name "foo" + natsQueueSubSync(t, ncB2, "XYZ", "foo") + natsFlush(t, ncB2) + + ncA := natsConnect(t, a.ClientURL(), nats.UserInfo("a", "pwd")) + defer ncA.Close() + // From the leafnode, create a plain sub on "foo" + natsSubSync(t, ncA, "foo") + // And a queue sub on "XYZ" with queue name "foo" + natsQueueSubSync(t, ncA, "XYZ", "foo") + natsFlush(t, ncA) + + // Check the acc.rm on B2 + b2Acc, err := b2.LookupAccount("XYZ") + require_NoError(t, err) + + rsubKey := keyFromSubWithOrigin(&subscription{subject: []byte("foo")}) + rqsubKey := keyFromSubWithOrigin(&subscription{subject: []byte("XYZ"), queue: []byte("foo")}) + rlsubKey := keyFromSubWithOrigin(&subscription{origin: []byte("XYZ"), subject: []byte("foo")}) + rlqsubKey := keyFromSubWithOrigin(&subscription{origin: []byte("XYZ"), subject: []byte("XYZ"), queue: []byte("foo")}) + // Ensure all keys are different + require_True(t, rsubKey != rqsubKey && rqsubKey != rlsubKey && rlsubKey != rlqsubKey) + + checkFor(t, time.Second, 10*time.Millisecond, func() error { + b2Acc.mu.RLock() + defer b2Acc.mu.RUnlock() + for _, key := range []string{rsubKey, rqsubKey, rlsubKey, rlqsubKey} { + v, ok := b2Acc.rm[key] + if !ok { + return fmt.Errorf("Did not find key %q for sub: %+v", key, sub) + } + if v != 1 { + return fmt.Errorf("Key %q v=%v for sub: %+v", key, v, sub) + } + } + return nil + }) + + // Now check that on B1, we have 2 distinct subs for the route. + b1Acc, err := b1.LookupAccount("XYZ") + require_NoError(t, err) + + var route *client + + if test.pinnedAccount == _EMPTY_ { + b1Acc.mu.RLock() + rIdx := b1Acc.routePoolIdx + b1Acc.mu.RUnlock() + b1.mu.RLock() + b1.forEachRouteIdx(rIdx, func(r *client) bool { + route = r + return false + }) + b1.mu.RUnlock() + } else { + b1.mu.RLock() + remotes := b1.accRoutes["XYZ"] + for _, r := range remotes { + route = r + break + } + b1.mu.RUnlock() + } + + checkFor(t, time.Second, 10*time.Millisecond, func() error { + // Check that route.subs has 4 entries for the subs we + // created in this test. + var entries []string + route.mu.Lock() + for key := range route.subs { + if strings.Contains(key, "foo") { + entries = append(entries, key) + } + } + route.mu.Unlock() + // With new servers, we expect 4 entries, but with older servers, + // we have collisions and have only 2. + var expected int + if test.lnocu { + expected = 4 + } else { + expected = 2 + } + if len(entries) != expected { + return fmt.Errorf("Expected %d entries with %q, got this: %q", expected, "foo", entries) + } + return nil + }) + + // Close the connections and expect all gone. + ncB2.Close() + ncA.Close() + + checkFor(t, time.Second, 10*time.Millisecond, func() error { + b2Acc.mu.RLock() + defer b2Acc.mu.RUnlock() + for _, key := range []string{rsubKey, rqsubKey, rlsubKey, rlqsubKey} { + if _, ok := b2Acc.rm[key]; ok { + return fmt.Errorf("Key %q still present", key) + } + } + return nil + }) + checkFor(t, time.Second, 10*time.Millisecond, func() error { + var entries []string + route.mu.Lock() + for key := range route.subs { + if strings.Contains(key, "foo") { + entries = append(entries, key) + } + } + route.mu.Unlock() + if len(entries) != 0 { + return fmt.Errorf("Still routed subscriptions on %q: %q", "foo", entries) + } + return nil + }) }) } } diff --git a/server/memstore.go b/server/memstore.go index 5baf42870be..25636a32431 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -1640,8 +1640,6 @@ func (o *consumerMemStore) Update(state *ConsumerState) error { pending = make(map[uint64]*Pending, len(state.Pending)) for seq, p := range state.Pending { pending[seq] = &Pending{p.Sequence, p.Timestamp} - } - for seq := range pending { if seq <= state.AckFloor.Stream || seq > state.Delivered.Stream { return fmt.Errorf("bad pending entry, sequence [%d] out of range", seq) } @@ -1656,10 +1654,10 @@ func (o *consumerMemStore) Update(state *ConsumerState) error { // Replace our state. o.mu.Lock() + defer o.mu.Unlock() // Check to see if this is an outdated update. - if state.Delivered.Consumer < o.state.Delivered.Consumer { - o.mu.Unlock() + if state.Delivered.Consumer < o.state.Delivered.Consumer || state.AckFloor.Stream < o.state.AckFloor.Stream { return fmt.Errorf("old update ignored") } @@ -1667,7 +1665,6 @@ func (o *consumerMemStore) Update(state *ConsumerState) error { o.state.AckFloor = state.AckFloor o.state.Pending = pending o.state.Redelivered = redelivered - o.mu.Unlock() return nil } diff --git a/server/norace_test.go b/server/norace_test.go index 5932a207073..e9cf7be692e 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -8510,13 +8510,16 @@ func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamPreAck(t *testing. c := &cluster{t: t, servers: make([]*Server, 3), opts: make([]*Options, 3), name: "F3"} // S1 - conf := fmt.Sprintf(tmpl, "S1", t.TempDir(), 14622, "route://127.0.0.1:15622, route://127.0.0.1:16622") + // The route connection to S2 must be through a slow proxy + np12 := createNetProxy(10*time.Millisecond, 1024*1024*1024, 1024*1024*1024, "route://127.0.0.1:15622", true) + routes := fmt.Sprintf("%s, route://127.0.0.1:16622", np12.routeURL()) + conf := fmt.Sprintf(tmpl, "S1", t.TempDir(), 14622, routes) c.servers[0], c.opts[0] = RunServerWithConfig(createConfFile(t, []byte(conf))) // S2 - // Create the proxy first. Connect this to S1. Make it slow, e.g. 5ms RTT. - np := createNetProxy(1*time.Millisecond, 1024*1024*1024, 1024*1024*1024, "route://127.0.0.1:14622", true) - routes := fmt.Sprintf("%s, route://127.0.0.1:16622", np.routeURL()) + // The route connection to S1 must be through a slow proxy + np21 := createNetProxy(10*time.Millisecond, 1024*1024*1024, 1024*1024*1024, "route://127.0.0.1:14622", true) + routes = fmt.Sprintf("%s, route://127.0.0.1:16622", np21.routeURL()) conf = fmt.Sprintf(tmpl, "S2", t.TempDir(), 15622, routes) c.servers[1], c.opts[1] = RunServerWithConfig(createConfFile(t, []byte(conf))) @@ -8527,13 +8530,21 @@ func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamPreAck(t *testing. c.checkClusterFormed() c.waitOnClusterReady() defer c.shutdown() - defer np.stop() + defer np12.stop() + defer np21.stop() - nc, js := jsClientConnect(t, c.randomServer()) - defer nc.Close() + slow := c.servers[0] // Expecting pre-acks here. + sl := c.servers[1] // Stream leader, will publish here. + cl := c.servers[2] // Consumer leader, will consume & ack here. + + snc, sjs := jsClientConnect(t, sl) + defer snc.Close() + + cnc, cjs := jsClientConnect(t, cl) + defer cnc.Close() // Now create the stream. - _, err := js.AddStream(&nats.StreamConfig{ + _, err := sjs.AddStream(&nats.StreamConfig{ Name: "EVENTS", Subjects: []string{"EV.>"}, Replicas: 3, @@ -8542,7 +8553,6 @@ func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamPreAck(t *testing. require_NoError(t, err) // Make sure it's leader is on S2. - sl := c.servers[1] checkFor(t, 20*time.Second, 200*time.Millisecond, func() error { c.waitOnStreamLeader(globalAccountName, "EVENTS") if s := c.streamLeader(globalAccountName, "EVENTS"); s != sl { @@ -8553,7 +8563,7 @@ func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamPreAck(t *testing. }) // Now create the consumer. - _, err = js.AddConsumer("EVENTS", &nats.ConsumerConfig{ + _, err = sjs.AddConsumer("EVENTS", &nats.ConsumerConfig{ Durable: "C", AckPolicy: nats.AckExplicitPolicy, DeliverSubject: "dx", @@ -8561,7 +8571,6 @@ func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamPreAck(t *testing. require_NoError(t, err) // Make sure the consumer leader is on S3. - cl := c.servers[2] checkFor(t, 20*time.Second, 200*time.Millisecond, func() error { c.waitOnConsumerLeader(globalAccountName, "EVENTS", "C") if s := c.consumerLeader(globalAccountName, "EVENTS", "C"); s != cl { @@ -8571,37 +8580,36 @@ func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamPreAck(t *testing. return nil }) - // Create the real consumer on the consumer leader to make it efficient. - nc, js = jsClientConnect(t, cl) - defer nc.Close() - - _, err = js.Subscribe(_EMPTY_, func(msg *nats.Msg) { + _, err = cjs.Subscribe(_EMPTY_, func(msg *nats.Msg) { msg.Ack() }, nats.BindStream("EVENTS"), nats.Durable("C"), nats.ManualAck()) require_NoError(t, err) + // Publish directly on the stream leader to make it efficient. for i := 0; i < 1_000; i++ { - _, err := js.PublishAsync("EVENTS.PAID", []byte("ok")) + _, err := sjs.PublishAsync("EV.PAID", []byte("ok")) require_NoError(t, err) } select { - case <-js.PublishAsyncComplete(): + case <-sjs.PublishAsyncComplete(): case <-time.After(5 * time.Second): t.Fatalf("Did not receive completion signal") } - slow := c.servers[0] mset, err := slow.GlobalAccount().lookupStream("EVENTS") require_NoError(t, err) - // Make sure preAck is non-nil, so we know the logic has kicked in. - mset.mu.RLock() - preAcks := mset.preAcks - mset.mu.RUnlock() - require_NotNil(t, preAcks) - - checkFor(t, 5*time.Second, 200*time.Millisecond, func() error { + checkFor(t, 10*time.Second, 200*time.Millisecond, func() error { state := mset.state() + if state.LastSeq != 1000 { + return fmt.Errorf("Haven't received all messages yet (last seq %d)", state.LastSeq) + } + mset.mu.RLock() + preAcks := mset.preAcks + mset.mu.RUnlock() + if preAcks == nil { + return fmt.Errorf("Expected to have preAcks by now") + } if state.Msgs == 0 { mset.mu.RLock() lp := len(mset.preAcks) @@ -11065,17 +11073,19 @@ func TestNoRaceJetStreamClusterCheckInterestStatePerformanceWQ(t *testing.T) { // Was > 30 ms before fix for comparison, M2 macbook air. require_LessThan(t, elapsed, 5*time.Millisecond) - // Make sure we set the chkflr correctly. - checkFloor := func(o *consumer) uint64 { + // Make sure we set the chkflr correctly. The chkflr should be equal to asflr+1. + // Otherwise, if chkflr would be set higher a subsequent call to checkInterestState will be ineffective. + requireFloorsEqual := func(o *consumer) { + t.Helper() require_True(t, o != nil) o.mu.RLock() defer o.mu.RUnlock() - return o.chkflr + require_Equal(t, o.chkflr, o.asflr+1) } - require_Equal(t, checkFloor(mset.lookupConsumer("A")), 1) - require_Equal(t, checkFloor(mset.lookupConsumer("B")), 110_001) - require_Equal(t, checkFloor(mset.lookupConsumer("C")), 110_001) + requireFloorsEqual(mset.lookupConsumer("A")) + requireFloorsEqual(mset.lookupConsumer("B")) + requireFloorsEqual(mset.lookupConsumer("C")) // Expire all the blocks again. expireAllBlks() diff --git a/server/parser.go b/server/parser.go index 74f55f576d2..663a1dc1268 100644 --- a/server/parser.go +++ b/server/parser.go @@ -788,7 +788,8 @@ func (c *client) parse(buf []byte) error { c.traceInOp("LS-", arg) } } - err = c.processRemoteUnsub(arg) + leafUnsub := c.op == 'L' || c.op == 'l' + err = c.processRemoteUnsub(arg, leafUnsub) case GATEWAY: if trace { c.traceInOp("RS-", arg) diff --git a/server/raft.go b/server/raft.go index a0dc6402b33..267297ab235 100644 --- a/server/raft.go +++ b/server/raft.go @@ -198,7 +198,7 @@ type raft struct { hcommit uint64 // The commit at the time that applies were paused pobserver bool // Whether we were an observer at the time that applies were paused - prop *ipQueue[*Entry] // Proposals + prop *ipQueue[*proposedEntry] // Proposals entry *ipQueue[*appendEntry] // Append entries resp *ipQueue[*appendEntryResponse] // Append entries responses apply *ipQueue[*CommittedEntry] // Apply queue (committed entries to be passed to upper layer) @@ -209,6 +209,11 @@ type raft struct { quit chan struct{} // Raft group shutdown } +type proposedEntry struct { + *Entry + reply string // Optional, to respond once proposal handled +} + // cacthupState structure that holds our subscription, and catchup term and index // as well as starting term and index and how many updates we have seen. type catchupState struct { @@ -387,7 +392,7 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel quit: make(chan struct{}), reqs: newIPQueue[*voteRequest](s, qpfx+"vreq"), votes: newIPQueue[*voteResponse](s, qpfx+"vresp"), - prop: newIPQueue[*Entry](s, qpfx+"entry"), + prop: newIPQueue[*proposedEntry](s, qpfx+"entry"), entry: newIPQueue[*appendEntry](s, qpfx+"appendEntry"), resp: newIPQueue[*appendEntryResponse](s, qpfx+"appendEntryResponse"), apply: newIPQueue[*CommittedEntry](s, qpfx+"committedEntry"), @@ -716,7 +721,7 @@ func (n *raft) Propose(data []byte) error { if werr := n.werr; werr != nil { return werr } - n.prop.push(newEntry(EntryNormal, data)) + n.prop.push(newProposedEntry(newEntry(EntryNormal, data), _EMPTY_)) return nil } @@ -735,20 +740,21 @@ func (n *raft) ProposeMulti(entries []*Entry) error { return werr } for _, e := range entries { - n.prop.push(e) + n.prop.push(newProposedEntry(e, _EMPTY_)) } return nil } // ForwardProposal will forward the proposal to the leader if known. // If we are the leader this is the same as calling propose. -// FIXME(dlc) - We could have a reply subject and wait for a response -// for retries, but would need to not block and be in separate Go routine. func (n *raft) ForwardProposal(entry []byte) error { if n.Leader() { return n.Propose(entry) } + // TODO: Currently we do not set a reply subject, even though we are + // now capable of responding. Do this once enough time has passed, + // i.e. maybe in 2.12. n.sendRPC(n.psubj, _EMPTY_, entry) return nil } @@ -767,7 +773,7 @@ func (n *raft) ProposeAddPeer(peer string) error { prop := n.prop n.RUnlock() - prop.push(newEntry(EntryAddPeer, []byte(peer))) + prop.push(newProposedEntry(newEntry(EntryAddPeer, []byte(peer)), _EMPTY_)) return nil } @@ -803,7 +809,7 @@ func (n *raft) ProposeRemovePeer(peer string) error { // peer remove and then notifying the rest of the group that the // peer was removed. if isLeader { - prop.push(newEntry(EntryRemovePeer, []byte(peer))) + prop.push(newProposedEntry(newEntry(EntryRemovePeer, []byte(peer)), _EMPTY_)) n.doRemovePeerAsLeader(peer) return nil } @@ -1484,7 +1490,7 @@ func (n *raft) StepDown(preferred ...string) error { // If we have a new leader selected, transfer over to them. if maybeLeader != noLeader { n.debug("Selected %q for new leader", maybeLeader) - prop.push(newEntry(EntryLeaderTransfer, []byte(maybeLeader))) + prop.push(newProposedEntry(newEntry(EntryLeaderTransfer, []byte(maybeLeader)), _EMPTY_)) } else { // Force us to stepdown here. n.debug("Stepping down") @@ -2105,6 +2111,26 @@ func (ae *appendEntry) returnToPool() { aePool.Put(ae) } +// Pool for proposedEntry re-use. +var pePool = sync.Pool{ + New: func() any { + return &proposedEntry{} + }, +} + +// Create a new proposedEntry. +func newProposedEntry(entry *Entry, reply string) *proposedEntry { + pe := pePool.Get().(*proposedEntry) + pe.Entry, pe.reply = entry, reply + return pe +} + +// Will return this proosed entry. +func (pe *proposedEntry) returnToPool() { + pe.Entry, pe.reply = nil, _EMPTY_ + pePool.Put(pe) +} + type EntryType uint8 const ( @@ -2314,7 +2340,7 @@ func (n *raft) handleForwardedRemovePeerProposal(sub *subscription, c *client, _ // Need to copy since this is underlying client/route buffer. peer := copyBytes(msg) - prop.push(newEntry(EntryRemovePeer, peer)) + prop.push(newProposedEntry(newEntry(EntryRemovePeer, peer), reply)) } // Called when a peer has forwarded a proposal. @@ -2335,7 +2361,7 @@ func (n *raft) handleForwardedProposal(sub *subscription, c *client, _ *Account, return } - prop.push(newEntry(EntryNormal, msg)) + prop.push(newProposedEntry(newEntry(EntryNormal, msg), reply)) } func (n *raft) runAsLeader() { @@ -2404,7 +2430,7 @@ func (n *raft) runAsLeader() { if b.Type == EntryRemovePeer { n.doRemovePeerAsLeader(string(b.Data)) } - entries = append(entries, b) + entries = append(entries, b.Entry) // If this is us sending out a leadership transfer stepdown inline here. if b.Type == EntryLeaderTransfer { // Send out what we have and switch to follower. @@ -2429,6 +2455,13 @@ func (n *raft) runAsLeader() { if len(entries) > 0 { n.sendAppendEntry(entries) } + // Respond to any proposals waiting for a confirmation. + for _, pe := range es { + if pe.reply != _EMPTY_ { + n.sendReply(pe.reply, nil) + } + pe.returnToPool() + } n.prop.recycle(&es) case <-hb.C: diff --git a/server/reload.go b/server/reload.go index 347fcfd8b79..07e5d021ad5 100644 --- a/server/reload.go +++ b/server/reload.go @@ -2172,15 +2172,22 @@ func (s *Server) reloadClusterPermissions(oldPerms *RoutePermissions) { } deleteRoutedSubs = deleteRoutedSubs[:0] route.mu.Lock() + pa, _, hasSubType := route.getRoutedSubKeyInfo() for key, sub := range route.subs { - if an := strings.Fields(key)[0]; an != accName { - continue + // If this is not a pinned-account route, we need to get the + // account name from the key to see if we collect this sub. + if !pa { + if an := getAccNameFromRoutedSubKey(sub, key, hasSubType); an != accName { + continue + } } // If we can't export, we need to drop the subscriptions that // we have on behalf of this route. + // Need to make a string cast here since canExport call sl.Match() subj := string(sub.subject) if !route.canExport(subj) { - delete(route.subs, string(sub.sid)) + // We can use bytesToString() here. + delete(route.subs, bytesToString(sub.sid)) deleteRoutedSubs = append(deleteRoutedSubs, sub) } } diff --git a/server/route.go b/server/route.go index 0341f79868e..0c455547c98 100644 --- a/server/route.go +++ b/server/route.go @@ -74,6 +74,7 @@ type route struct { didSolicit bool retry bool lnoc bool + lnocu bool routeType RouteType url *url.URL authRequired bool @@ -112,6 +113,7 @@ type connectInfo struct { Cluster string `json:"cluster"` Dynamic bool `json:"cluster_dynamic,omitempty"` LNOC bool `json:"lnoc,omitempty"` + LNOCU bool `json:"lnocu,omitempty"` // Support for LS- with origin cluster name Gateway string `json:"gateway,omitempty"` } @@ -767,6 +769,7 @@ func (c *client) processRouteInfo(info *Info) { c.route.gatewayURL = info.GatewayURL c.route.remoteName = info.Name c.route.lnoc = info.LNOC + c.route.lnocu = info.LNOCU c.route.jetstream = info.JetStream // When sent through route INFO, if the field is set, it should be of size 1. @@ -1169,6 +1172,36 @@ type asubs struct { subs []*subscription } +// Returns the account name from the subscription's key. +// This is invoked knowing that the key contains an account name, so for a sub +// that is not from a pinned-account route. +// The `keyHasSubType` boolean indicates that the key starts with the indicator +// for leaf or regular routed subscriptions. +func getAccNameFromRoutedSubKey(sub *subscription, key string, keyHasSubType bool) string { + var accIdx int + if keyHasSubType { + // Start after the sub type indicator. + accIdx = 1 + // But if there is an origin, bump its index. + if len(sub.origin) > 0 { + accIdx = 2 + } + } + return strings.Fields(key)[accIdx] +} + +// Returns if the route is dedicated to an account, its name, and a boolean +// that indicates if this route uses the routed subscription indicator at +// the beginning of the subscription key. +// Lock held on entry. +func (c *client) getRoutedSubKeyInfo() (bool, string, bool) { + var accName string + if an := c.route.accName; len(an) > 0 { + accName = string(an) + } + return accName != _EMPTY_, accName, c.route.lnocu +} + // removeRemoteSubs will walk the subs and remove them from the appropriate account. func (c *client) removeRemoteSubs() { // We need to gather these on a per account basis. @@ -1178,14 +1211,18 @@ func (c *client) removeRemoteSubs() { srv := c.srv subs := c.subs c.subs = nil + pa, accountName, hasSubType := c.getRoutedSubKeyInfo() c.mu.Unlock() for key, sub := range subs { c.mu.Lock() sub.max = 0 c.mu.Unlock() - // Grab the account - accountName := strings.Fields(key)[0] + // If not a pinned-account route, we need to find the account + // name from the sub's key. + if !pa { + accountName = getAccNameFromRoutedSubKey(sub, key, hasSubType) + } ase := as[accountName] if ase == nil { if v, ok := srv.accounts.Load(accountName); ok { @@ -1197,10 +1234,14 @@ func (c *client) removeRemoteSubs() { } else { ase.subs = append(ase.subs, sub) } + delta := int32(1) + if len(sub.queue) > 0 { + delta = sub.qw + } if srv.gateway.enabled { - srv.gatewayUpdateSubInterest(accountName, sub, -1) + srv.gatewayUpdateSubInterest(accountName, sub, -delta) } - ase.acc.updateLeafNodes(sub, -1) + ase.acc.updateLeafNodes(sub, -delta) } // Now remove the subs by batch for each account sublist. @@ -1217,8 +1258,9 @@ func (c *client) removeRemoteSubs() { // Lock is held on entry func (c *client) removeRemoteSubsForAcc(name string) []*subscription { var subs []*subscription + _, _, hasSubType := c.getRoutedSubKeyInfo() for key, sub := range c.subs { - an := strings.Fields(key)[0] + an := getAccNameFromRoutedSubKey(sub, key, hasSubType) if an == name { sub.max = 0 subs = append(subs, sub) @@ -1228,46 +1270,69 @@ func (c *client) removeRemoteSubsForAcc(name string) []*subscription { return subs } -func (c *client) parseUnsubProto(arg []byte) (string, []byte, []byte, error) { +func (c *client) parseUnsubProto(arg []byte, accInProto, hasOrigin bool) ([]byte, string, []byte, []byte, error) { // Indicate any activity, so pub and sub or unsubs. c.in.subs++ args := splitArg(arg) - var queue []byte - var accountName string - subjIdx := 1 - c.mu.Lock() - if c.kind == ROUTER && c.route != nil { - if accountName = string(c.route.accName); accountName != _EMPTY_ { - subjIdx = 0 - } + var ( + origin []byte + accountName string + queue []byte + subjIdx int + ) + // If `hasOrigin` is true, then it means this is a LS- with origin in proto. + if hasOrigin { + // We would not be here if there was not at least 1 field. + origin = args[0] + subjIdx = 1 + } + // If there is an account in the protocol, bump the subject index. + if accInProto { + subjIdx++ } - c.mu.Unlock() switch len(args) { case subjIdx + 1: case subjIdx + 2: queue = args[subjIdx+1] default: - return _EMPTY_, nil, nil, fmt.Errorf("parse error: '%s'", arg) + return nil, _EMPTY_, nil, nil, fmt.Errorf("parse error: '%s'", arg) } - if accountName == _EMPTY_ { - accountName = string(args[0]) + if accInProto { + // If there is an account in the protocol, it is before the subject. + accountName = string(args[subjIdx-1]) } - return accountName, args[subjIdx], queue, nil + return origin, accountName, args[subjIdx], queue, nil } // Indicates no more interest in the given account/subject for the remote side. -func (c *client) processRemoteUnsub(arg []byte) (err error) { +func (c *client) processRemoteUnsub(arg []byte, leafUnsub bool) (err error) { srv := c.srv if srv == nil { return nil } - accountName, subject, _, err := c.parseUnsubProto(arg) + + var accountName string + // Assume the account will be in the protocol. + accInProto := true + + c.mu.Lock() + originSupport := c.route.lnocu + if c.route != nil && len(c.route.accName) > 0 { + accountName, accInProto = string(c.route.accName), false + } + c.mu.Unlock() + + hasOrigin := leafUnsub && originSupport + _, accNameFromProto, subject, _, err := c.parseUnsubProto(arg, accInProto, hasOrigin) if err != nil { return fmt.Errorf("processRemoteUnsub %s", err.Error()) } + if accInProto { + accountName = accNameFromProto + } // Lookup the account var acc *Account if v, ok := srv.accounts.Load(accountName); ok { @@ -1284,28 +1349,43 @@ func (c *client) processRemoteUnsub(arg []byte) (err error) { } updateGWs := false - // We store local subs by account and subject and optionally queue name. - // RS- will have the arg exactly as the key. + + _keya := [128]byte{} + _key := _keya[:0] + var key string - if c.kind == ROUTER && c.route != nil && len(c.route.accName) > 0 { - key = accountName + " " + bytesToString(arg) - } else { + if !originSupport { + // If it is an LS- or RS-, we use the protocol as-is as the key. key = bytesToString(arg) + } else { + // We need to prefix with the sub type. + if leafUnsub { + _key = append(_key, keyRoutedLeafSubByte) + } else { + _key = append(_key, keyRoutedSubByte) + } + _key = append(_key, ' ') + _key = append(_key, arg...) + key = bytesToString(_key) } + delta := int32(1) sub, ok := c.subs[key] if ok { delete(c.subs, key) acc.sl.Remove(sub) updateGWs = srv.gateway.enabled + if len(sub.queue) > 0 { + delta = sub.qw + } } c.mu.Unlock() if updateGWs { - srv.gatewayUpdateSubInterest(accountName, sub, -1) + srv.gatewayUpdateSubInterest(accountName, sub, -delta) } // Now check on leafnode updates. - acc.updateLeafNodes(sub, -1) + acc.updateLeafNodes(sub, -delta) if c.opts.Verbose { c.sendOK() @@ -1322,35 +1402,78 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) { return nil } - // Copy so we do not reference a potentially large buffer - arg := make([]byte, len(argo)) - copy(arg, argo) + // We copy `argo` to not reference the read buffer. However, we will + // prefix with a code that says if the remote sub is for a leaf + // (hasOrigin == true) or not to prevent key collisions. Imagine: + // "RS+ foo bar baz 1\r\n" => "foo bar baz" (a routed queue sub) + // "LS+ foo bar baz\r\n" => "foo bar baz" (a route leaf sub on "baz", + // for account "bar" with origin "foo"). + // + // The sub.sid/key will be set respectively to "R foo bar baz" and + // "L foo bar baz". + // + // We also no longer add the account if it was not present (due to + // pinned-account route) since there is no need really. + // + // For routes to older server, we will still create the "arg" with + // the above layout, but we will create the sub.sid/key as before, + // that is, not including the origin for LS+ because older server + // only send LS- without origin, so we would not be able to find + // the sub in the map. + c.mu.Lock() + accountName := string(c.route.accName) + oldStyle := !c.route.lnocu + c.mu.Unlock() - args := splitArg(arg) + // Indicate if the account name should be in the protocol. It would be the + // case if accountName is empty. + accInProto := accountName == _EMPTY_ + + // Copy so we do not reference a potentially large buffer. + // Add 2 more bytes for the routed sub type. + arg := make([]byte, 0, 2+len(argo)) + if hasOrigin { + arg = append(arg, keyRoutedLeafSubByte) + } else { + arg = append(arg, keyRoutedSubByte) + } + arg = append(arg, ' ') + arg = append(arg, argo...) + + // Now split to get all fields. Unroll splitArgs to avoid runtime/heap issues. + a := [MAX_RSUB_ARGS][]byte{} + args := a[:0] + start := -1 + for i, b := range arg { + switch b { + case ' ', '\t', '\r', '\n': + if start >= 0 { + args = append(args, arg[start:i]) + start = -1 + } + default: + if start < 0 { + start = i + } + } + } + if start >= 0 { + args = append(args, arg[start:]) + } + + delta := int32(1) sub := &subscription{client: c} - // This value indicate what is the mandatory subject offset in the args - // slice. It varies based on the optional presence of origin or account name - // fields (tha latter would not be present for "per-account" routes). - var subjIdx int - // If account is present, this is its "char" position in arg slice. - var accPos int + // There will always be at least a subject, but its location will depend + // on if there is an origin, an account name, etc.. Since we know that + // we have added the sub type indicator as the first field, the subject + // position will be at minimum at index 1. + subjIdx := 1 if hasOrigin { - // Set to 1, will be adjusted if the account is also expected. - subjIdx = 1 - sub.origin = args[0] - // The account would start after the origin and trailing space. - accPos = len(sub.origin) + 1 + subjIdx++ } - c.mu.Lock() - accountName := string(c.route.accName) - c.mu.Unlock() - // If the route is dedicated to an account, accountName will not - // be empty. If it is, then the account must be in the protocol. - var accInProto bool - if accountName == _EMPTY_ { + if accInProto { subjIdx++ - accInProto = true } switch len(args) { case subjIdx + 1: @@ -1358,15 +1481,50 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) { case subjIdx + 3: sub.queue = args[subjIdx+1] sub.qw = int32(parseSize(args[subjIdx+2])) + // TODO: (ik) We should have a non empty queue name and a queue + // weight >= 1. For 2.11, we may want to return an error if that + // is not the case, but for now just overwrite `delta` if queue + // weight is greater than 1 (it is possible after a reconnect/ + // server restart to receive a queue weight > 1 for a new sub). + if sub.qw > 1 { + delta = sub.qw + } default: return fmt.Errorf("processRemoteSub Parse Error: '%s'", arg) } + // We know that the number of fields is correct. So we can access args[] based + // on where we expect the fields to be. + + // If there is an origin, it will be at index 1. + if hasOrigin { + sub.origin = args[1] + } + // For subject, use subjIdx. sub.subject = args[subjIdx] - // If the account name is empty (not a "per-account" route), the account - // is at the index prior to the subject. - if accountName == _EMPTY_ { + // If the account name is in the protocol, it will be before the subject. + if accInProto { accountName = bytesToString(args[subjIdx-1]) } + // Now set the sub.sid from the arg slice. However, we will have a different + // one if we use the origin or not. + start = 0 + end := len(arg) + if sub.queue != nil { + // Remove the ' ' from the arg length. + end -= 1 + len(args[subjIdx+2]) + } + if oldStyle { + // We will start at the account (if present) or at the subject. + // We first skip the "R " or "L " + start = 2 + // And if there is an origin skip that. + if hasOrigin { + start += len(sub.origin) + 1 + } + // Here we are pointing at the account (if present), or at the subject. + } + sub.sid = arg[start:end] + // Lookup account while avoiding fetch. // A slow fetch delays subsequent remote messages. It also avoids the expired check (see below). // With all but memory resolver lookup can be delayed or fail. @@ -1424,33 +1582,6 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) { return nil } - // We store local subs by account and subject and optionally queue name. - // If we have a queue it will have a trailing weight which we do not want. - if sub.queue != nil { - // if the account is in the protocol, we can reference directly "arg", - // otherwise, we need to allocate/construct the sid. - if accInProto { - sub.sid = arg[accPos : accPos+len(accountName)+1+len(sub.subject)+1+len(sub.queue)] - } else { - // It is unfortunate that we have to do this, but the gain of not - // having the account name in message protocols outweight the - // penalty of having to do this here for the processing of a - // subscription. - sub.sid = append(sub.sid, accountName...) - sub.sid = append(sub.sid, ' ') - sub.sid = append(sub.sid, sub.subject...) - sub.sid = append(sub.sid, ' ') - sub.sid = append(sub.sid, sub.queue...) - } - } else if accInProto { - sub.sid = arg[accPos:] - } else { - sub.sid = append(sub.sid, accountName...) - sub.sid = append(sub.sid, ' ') - sub.sid = append(sub.sid, sub.subject...) - } - key := bytesToString(sub.sid) - acc.mu.RLock() // For routes (this can be called by leafnodes), check if the account is // transitioning (from pool to dedicated route) and this route is not a @@ -1465,9 +1596,11 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) { } sl := acc.sl acc.mu.RUnlock() + + // We use the sub.sid for the key of the c.subs map. + key := bytesToString(sub.sid) osub := c.subs[key] updateGWs := false - delta := int32(1) if osub == nil { c.subs[key] = sub // Now place into the account sl. @@ -1509,10 +1642,14 @@ func (c *client) addRouteSubOrUnsubProtoToBuf(buf []byte, accName string, sub *s if isSubProto { buf = append(buf, lSubBytes...) buf = append(buf, sub.origin...) + buf = append(buf, ' ') } else { buf = append(buf, lUnsubBytes...) + if c.route.lnocu { + buf = append(buf, sub.origin...) + buf = append(buf, ' ') + } } - buf = append(buf, ' ') } else { if isSubProto { buf = append(buf, rSubBytes...) @@ -1613,18 +1750,27 @@ func (s *Server) sendSubsToRoute(route *client, idx int, account string) { for _, a := range accs { a.mu.RLock() for key, n := range a.rm { - var subj, qn []byte - s := strings.Split(key, " ") - subj = []byte(s[0]) - if len(s) > 1 { - qn = []byte(s[1]) + var origin, qn []byte + s := strings.Fields(key) + // Subject will always be the second field (index 1). + subj := stringToBytes(s[1]) + // Check if the key is for a leaf (will be field 0). + forLeaf := s[0] == keyRoutedLeafSub + // For queue, if not for a leaf, we need 3 fields "R foo bar", + // but if for a leaf, we need 4 fields "L foo bar leaf_origin". + if l := len(s); (!forLeaf && l == 3) || (forLeaf && l == 4) { + qn = stringToBytes(s[2]) + } + if forLeaf { + // The leaf origin will be the last field. + origin = stringToBytes(s[len(s)-1]) } - // s[0] is the subject and already as a string, so use that + // s[1] is the subject and already as a string, so use that // instead of converting back `subj` to a string. - if !route.canImport(s[0]) { + if !route.canImport(s[1]) { continue } - sub := subscription{subject: subj, queue: qn, qw: n} + sub := subscription{origin: origin, subject: subj, queue: qn, qw: n} buf = route.addRouteSubOrUnsubProtoToBuf(buf, a.Name, &sub, true) } a.mu.RUnlock() @@ -2286,8 +2432,9 @@ func (s *Server) updateRouteSubscriptionMap(acc *Account, sub *subscription, del return } - // Create the fast key which will use the subject or 'subjectqueue' for queue subscribers. - key := keyFromSub(sub) + // Create the subscription key which will prevent collisions between regular + // and leaf routed subscriptions. See keyFromSubWithOrigin() for details. + key := keyFromSubWithOrigin(sub) // Decide whether we need to send an update out to all the routes. update := isq @@ -2481,6 +2628,7 @@ func (s *Server) startRouteAcceptLoop() { Domain: s.info.Domain, Dynamic: s.isClusterNameDynamic(), LNOC: true, + LNOCU: true, } // For tests that want to simulate old servers, do not set the compression // on the INFO protocol if configured with CompressionNotSupported. @@ -2795,6 +2943,7 @@ func (c *client) processRouteConnect(srv *Server, arg []byte, lang string) error c.mu.Lock() c.route.remoteID = c.opts.Name c.route.lnoc = proto.LNOC + c.route.lnocu = proto.LNOCU c.setRoutePermissions(perms) c.headers = supportsHeaders && proto.Headers c.mu.Unlock() diff --git a/server/routes_test.go b/server/routes_test.go index 95338fabdca..6696fb7b7bd 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -1634,9 +1634,11 @@ func TestClusterQueueGroupWeightTrackingLeak(t *testing.T) { check := func(present bool, expected int32) { t.Helper() + sub := subscription{subject: []byte("foo"), queue: []byte("bar")} + key := keyFromSubWithOrigin(&sub) checkFor(t, time.Second, 15*time.Millisecond, func() error { acc.mu.RLock() - v, ok := acc.lqws["foo bar"] + v, ok := acc.lqws[key] acc.mu.RUnlock() if present { if !ok { diff --git a/server/server.go b/server/server.go index 099a466ca81..e6f6c728d42 100644 --- a/server/server.go +++ b/server/server.go @@ -94,6 +94,7 @@ type Info struct { Import *SubjectPermission `json:"import,omitempty"` Export *SubjectPermission `json:"export,omitempty"` LNOC bool `json:"lnoc,omitempty"` + LNOCU bool `json:"lnocu,omitempty"` InfoOnConnect bool `json:"info_on_connect,omitempty"` // When true the server will respond to CONNECT with an INFO ConnectInfo bool `json:"connect_info,omitempty"` // When true this is the server INFO response to CONNECT RoutePoolSize int `json:"route_pool_size,omitempty"` diff --git a/server/stream.go b/server/stream.go index 57a1fa015aa..1c0824cdb4d 100644 --- a/server/stream.go +++ b/server/stream.go @@ -5183,8 +5183,6 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { n.Delete() sa = mset.sa } else { - // Always attempt snapshot on clean exit. - n.InstallSnapshot(mset.stateSnapshotLocked()) n.Stop() } } diff --git a/server/test_test.go b/server/test_test.go index 1dc3a4e90ab..81509b83d63 100644 --- a/server/test_test.go +++ b/server/test_test.go @@ -19,6 +19,7 @@ import ( "math/rand" "net/url" "os" + "reflect" "strings" "testing" "time" @@ -74,10 +75,15 @@ func require_NoError(t testing.TB, err error) { } } -func require_NotNil(t testing.TB, v any) { +func require_NotNil[T any](t testing.TB, v T) { t.Helper() - if v == nil { - t.Fatalf("require not nil, but got: %v", v) + r := reflect.ValueOf(v) + switch k := r.Kind(); k { + case reflect.Ptr, reflect.Interface, reflect.Slice, + reflect.Map, reflect.Chan, reflect.Func: + if r.IsNil() { + t.Fatalf("require not nil, but got nil") + } } } diff --git a/test/new_routes_test.go b/test/new_routes_test.go index c8cc0b79a5a..5810fe4eaf7 100644 --- a/test/new_routes_test.go +++ b/test/new_routes_test.go @@ -58,6 +58,9 @@ func TestNewRouteInfoOnConnect(t *testing.T) { if !info.LNOC { t.Fatalf("Expected to have leafnode origin cluster support") } + if !info.LNOCU { + t.Fatalf("Expected to have leafnode origin cluster in unsub protocol support") + } } func TestNewRouteHeaderSupport(t *testing.T) { @@ -1713,29 +1716,93 @@ func TestNewRouteLeafNodeOriginSupport(t *testing.T) { info.ID = routeID info.Name = "" info.LNOC = true + // Overwrite to false to check that we are getting LS- without origin + // if we are an old server. + info.LNOCU = false b, err := json.Marshal(info) if err != nil { t.Fatalf("Could not marshal test route info: %v", err) } routeSend(fmt.Sprintf("INFO %s\r\n", b)) - routeExpect(rsubRe) + routeExpect(rlsubRe) pingPong() - // Make sure it can process and LS+ - routeSend("LS+ ln1 $G foo\r\n") - pingPong() + sendLSProtosFromRoute := func(lnocu bool) { + t.Helper() - if !gacc.SubscriptionInterest("foo") { - t.Fatalf("Expected interest on \"foo\"") - } + // Make sure it can process and LS+ + routeSend("LS+ ln1 $G foo\r\n") + pingPong() - // This should not have been sent to the leafnode since same origin cluster. - time.Sleep(10 * time.Millisecond) - if lgacc.SubscriptionInterest("foo") { - t.Fatalf("Did not expect interest on \"foo\"") + // Check interest is registered on remote server. + if !gacc.SubscriptionInterest("foo") { + t.Fatalf("Expected interest on \"foo\"") + } + + // This should not have been sent to the leafnode since same origin cluster. + time.Sleep(10 * time.Millisecond) + if lgacc.SubscriptionInterest("foo") { + t.Fatalf("Did not expect interest on \"foo\"") + } + + // Now unsub. Either act as an old server that does not support origin + // in the LS- or as a new server. + if lnocu { + routeSend("LS- ln1 $G foo\r\n") + } else { + routeSend("LS- $G foo\r\n") + } + pingPong() + + // Interest should be gone. + if gacc.SubscriptionInterest("foo") { + t.Fatalf("Expected no interest on \"foo\"") + } + + // Make sure we did not incorrectly send an interest to the leaf. + time.Sleep(10 * time.Millisecond) + if lgacc.SubscriptionInterest("foo") { + t.Fatalf("Did not expect interest on \"foo\"") + } + + // Repeat with a queue. + routeSend("LS+ ln1 $G foo bar 1\r\n") + pingPong() + + if !gacc.SubscriptionInterest("foo") { + t.Fatalf("Expected interest on \"foo\"") + } + + // This should not have been sent to the leafnode since same origin cluster. + time.Sleep(10 * time.Millisecond) + if lgacc.SubscriptionInterest("foo") { + t.Fatalf("Did not expect interest on \"foo\"") + } + + // Now unsub. + if lnocu { + routeSend("LS- ln1 $G foo bar\r\n") + } else { + routeSend("LS- $G foo bar\r\n") + } + pingPong() + + // Subscription should be gone. + if gacc.SubscriptionInterest("foo") { + t.Fatalf("Expected no interest on \"foo\"") + } + + // Make sure we did not incorrectly send an interest to the leaf. + time.Sleep(10 * time.Millisecond) + if lgacc.SubscriptionInterest("foo") { + t.Fatalf("Did not expect interest on \"foo\"") + } } + // Check the LS+/- when not supporting origin in LS- + sendLSProtosFromRoute(false) + // Create a connection on the leafnode server. nc, err := nats.Connect(ln.ClientURL()) if err != nil { @@ -1778,6 +1845,61 @@ func TestNewRouteLeafNodeOriginSupport(t *testing.T) { if n, _, _ := sub.Pending(); n != 0 { t.Fatalf("Should not have received the message on bar") } + + // Now unsubscribe, we should receive an LS- without origin. + sub.Unsubscribe() + routeExpect(lunsubRe) + + // Quick check for queues + sub, _ = nc.QueueSubscribeSync("baz", "bat") + // Let it propagate to the main server + checkFor(t, time.Second, 10*time.Millisecond, func() error { + if !gacc.SubscriptionInterest("baz") { + return fmt.Errorf("No interest") + } + return nil + }) + // For "baz" + routeExpect(rlsubRe) + sub.Unsubscribe() + routeExpect(lunsubRe) + + // Restart our routed server, but this time indicate support + // for LS- with origin cluster. + rc.Close() + rc = createRouteConn(t, opts.Cluster.Host, opts.Cluster.Port) + defer rc.Close() + + routeSend, routeExpect = setupRouteEx(t, rc, opts, routeID) + + info = checkInfoMsg(t, rc) + info.ID = routeID + info.Name = "" + // These should be already set to true since the server that sends the + // INFO has them enabled, but just be explicit. + info.LNOC = true + info.LNOCU = true + b, err = json.Marshal(info) + if err != nil { + t.Fatalf("Could not marshal test route info: %v", err) + } + + routeSend(fmt.Sprintf("INFO %s\r\n", b)) + routeExpect(rlsubRe) + pingPong() + + // Check the LS+/LS- + sendLSProtosFromRoute(true) + + sub, _ = nc.SubscribeSync("bar") + routeExpect(rlsubRe) + sub.Unsubscribe() + routeExpect(rlunsubRe) + + sub, _ = nc.QueueSubscribeSync("baz", "bat") + routeExpect(rlsubRe) + sub.Unsubscribe() + routeExpect(rlunsubRe) } // Check that real duplicate subscription (that is, sent by client with same sid) diff --git a/test/test.go b/test/test.go index 08ec926679d..072b8ca95cd 100644 --- a/test/test.go +++ b/test/test.go @@ -364,9 +364,10 @@ var ( asubRe = regexp.MustCompile(`A\+\s+([^\r\n]+)\r\n`) aunsubRe = regexp.MustCompile(`A\-\s+([^\r\n]+)\r\n`) lsubRe = regexp.MustCompile(`LS\+\s+([^\s]+)\s*([^\s]+)?\s*(\d+)?\r\n`) - lunsubRe = regexp.MustCompile(`LS\-\s+([^\s]+)\s*([^\s]+)?\r\n`) + lunsubRe = regexp.MustCompile(`LS\-\s+([^\s]+)\s*([^\s]+)\s*([^\s]+)?\r\n`) lmsgRe = regexp.MustCompile(`(?:(?:LMSG\s+([^\s]+)\s+(?:([|+]\s+([\w\s]+)|[^\s]+)[^\S\r\n]+)?(\d+)\s*\r\n([^\\r\\n]*?)\r\n)+?)`) rlsubRe = regexp.MustCompile(`LS\+\s+([^\s]+)\s+([^\s]+)\s+([^\s]+)\s*([^\s]+)?\s*(\d+)?\r\n`) + rlunsubRe = regexp.MustCompile(`LS\-\s+([^\s]+)\s+([^\s]+)\s+([^\s]+)\s*([^\s]+)?\r\n`) ) const (