diff --git a/server/jetstream.go b/server/jetstream.go index cba11d14efa..4ea6e340537 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -2151,6 +2151,10 @@ func (jsa *jsAccount) storageTotals() (uint64, uint64) { } func (jsa *jsAccount) limitsExceeded(storeType StorageType, tierName string, replicas int) (bool, *ApiError) { + return jsa.wouldExceedLimits(storeType, tierName, replicas, _EMPTY_, nil, nil) +} + +func (jsa *jsAccount) wouldExceedLimits(storeType StorageType, tierName string, replicas int, subj string, hdr, msg []byte) (bool, *ApiError) { jsa.usageMu.RLock() defer jsa.usageMu.RUnlock() @@ -2164,24 +2168,31 @@ func (jsa *jsAccount) limitsExceeded(storeType StorageType, tierName string, rep return false, nil } r := int64(replicas) - if r < 1 || tierName == _EMPTY_ { + // Make sure replicas is correct. + if r < 1 { r = 1 } + // This is for limits. If we have no tier, consider all to be flat, vs tiers like R3 where we want to scale limit by replication. + lr := r + if tierName == _EMPTY_ { + lr = 1 + } + // Since tiers are flat we need to scale limit up by replicas when checking. if storeType == MemoryStorage { - totalMem := inUse.total.mem - if selectedLimits.MemoryMaxStreamBytes > 0 && totalMem > selectedLimits.MemoryMaxStreamBytes*r { + totalMem := inUse.total.mem + (int64(memStoreMsgSize(subj, hdr, msg)) * r) + if selectedLimits.MemoryMaxStreamBytes > 0 && totalMem > selectedLimits.MemoryMaxStreamBytes*lr { return true, nil } - if selectedLimits.MaxMemory >= 0 && totalMem > selectedLimits.MaxMemory*r { + if selectedLimits.MaxMemory >= 0 && totalMem > selectedLimits.MaxMemory*lr { return true, nil } } else { - totalStore := inUse.total.store - if selectedLimits.StoreMaxStreamBytes > 0 && totalStore > selectedLimits.StoreMaxStreamBytes*r { + totalStore := inUse.total.store + (int64(fileStoreMsgSize(subj, hdr, msg)) * r) + if selectedLimits.StoreMaxStreamBytes > 0 && totalStore > selectedLimits.StoreMaxStreamBytes*lr { return true, nil } - if selectedLimits.MaxStore >= 0 && totalStore > selectedLimits.MaxStore*r { + if selectedLimits.MaxStore >= 0 && totalStore > selectedLimits.MaxStore*lr { return true, nil } } diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 274634546b6..07bd086df2c 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -7501,7 +7501,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ mset.mu.RLock() canRespond := !mset.cfg.NoAck && len(reply) > 0 name, stype, store := mset.cfg.Name, mset.cfg.Storage, mset.store - s, js, jsa, st, r, tierName, outq, node := mset.srv, mset.js, mset.jsa, mset.cfg.Storage, int64(mset.cfg.Replicas), mset.tier, mset.outq, mset.node + s, js, jsa, st, r, tierName, outq, node := mset.srv, mset.js, mset.jsa, mset.cfg.Storage, mset.cfg.Replicas, mset.tier, mset.outq, mset.node maxMsgSize, lseq, clfs := int(mset.cfg.MaxMsgSize), mset.lseq, mset.clfs isLeader, isSealed := mset.isLeader(), mset.cfg.Sealed mset.mu.RUnlock() @@ -7554,50 +7554,14 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ } // Check here pre-emptively if we have exceeded our account limits. - var exceeded bool - jsa.usageMu.Lock() - jsaLimits, ok := jsa.limits[tierName] - if !ok { - jsa.usageMu.Unlock() - err := fmt.Errorf("no JetStream resource limits found account: %q", jsa.acc().Name) - s.RateLimitWarnf(err.Error()) - if canRespond { - var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: name}} - resp.Error = NewJSNoLimitsError() - response, _ = json.Marshal(resp) - outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, response, nil, 0)) + if exceeded, err := jsa.wouldExceedLimits(st, tierName, r, subject, hdr, msg); exceeded { + if err == nil { + err = NewJSAccountResourcesExceededError() } - return err - } - t, ok := jsa.usage[tierName] - if !ok { - t = &jsaStorage{} - jsa.usage[tierName] = t - } - // Make sure replicas is correct. - if r < 1 { - r = 1 - } - // This is for limits. If we have no tier, consider all to be flat, vs tiers like R3 where we want to scale limit by replication. - lr := r - if tierName == _EMPTY_ { - lr = 1 - } - // Tiers are flat, meaning the limit for R3 will be 100GB, not 300GB, so compare to total but adjust limits. - if st == MemoryStorage && jsaLimits.MaxMemory > 0 { - exceeded = t.total.mem+(int64(memStoreMsgSize(subject, hdr, msg))*r) > (jsaLimits.MaxMemory * lr) - } else if jsaLimits.MaxStore > 0 { - exceeded = t.total.store+(int64(fileStoreMsgSize(subject, hdr, msg))*r) > (jsaLimits.MaxStore * lr) - } - jsa.usageMu.Unlock() - - // If we have exceeded our account limits go ahead and return. - if exceeded { - err := fmt.Errorf("JetStream resource limits exceeded for account: %q", jsa.acc().Name) s.RateLimitWarnf(err.Error()) if canRespond { var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: name}} - resp.Error = NewJSAccountResourcesExceededError() + resp.Error = err response, _ = json.Marshal(resp) outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, response, nil, 0)) } diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 601b4724361..9a7ae83014b 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -6033,3 +6033,324 @@ func TestJetStreamClusterDomainAdvisory(t *testing.T) { require_Equal(t, adv.Cluster, "R3S") require_Equal(t, len(adv.Replicas), 3) } + +func TestJetStreamClusterLimitsBasedStreamFileStoreDesync(t *testing.T) { + conf := ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: { + store_dir: '%s', + } + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + system_account: sys + no_auth_user: js + accounts { + sys { + users = [ + { user: sys, pass: sys } + ] + } + js { + jetstream = { store_max_stream_bytes = 3mb } + users = [ + { user: js, pass: js } + ] + } + }` + c := createJetStreamClusterWithTemplate(t, conf, "limits", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + cnc, cjs := jsClientConnect(t, c.randomServer()) + defer cnc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "LTEST", + Subjects: []string{"messages.*"}, + Replicas: 3, + MaxAge: 10 * time.Minute, + MaxMsgs: 100_000, + }) + require_NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + psub, err := cjs.PullSubscribe("messages.*", "consumer") + require_NoError(t, err) + + var ( + wg sync.WaitGroup + received uint64 + errCh = make(chan error, 100_000) + receivedMap = make(map[string]*nats.Msg) + ) + wg.Add(1) + go func() { + tick := time.NewTicker(20 * time.Millisecond) + for { + select { + case <-ctx.Done(): + wg.Done() + return + case <-tick.C: + msgs, err := psub.Fetch(10, nats.MaxWait(200*time.Millisecond)) + if err != nil { + continue + } + for _, msg := range msgs { + received++ + receivedMap[msg.Subject] = msg + if meta, _ := msg.Metadata(); meta.NumDelivered > 1 { + t.Logf("GOT MSG: %s :: %+v :: %d", msg.Subject, meta, len(msg.Data)) + } + msg.Ack() + } + } + } + }() + + // Send 20_000 msgs at roughly 1 msg per msec + shouldDrop := make(map[string]error) + wg.Add(1) + go func() { + payload := []byte(strings.Repeat("A", 1024)) + tick := time.NewTicker(1 * time.Millisecond) + for i := 1; i < 100_000; { + select { + case <-ctx.Done(): + wg.Done() + return + case <-tick.C: + // This should run into 3MB quota and get errors right away + // before the max msgs limit does. + subject := fmt.Sprintf("messages.%d", i) + _, err := js.Publish(subject, payload, nats.RetryAttempts(0)) + if err != nil { + errCh <- err + } + i++ + + // Any message over this number should not be a success + // since the stream should be full due to the quota. + // Here we capture that the messages have failed to confirm. + if err != nil && i > 1000 { + shouldDrop[subject] = err + } + } + } + }() + + // Collect enough errors to cause things to get out of sync. + var errCount int +Setup: + for { + select { + case err = <-errCh: + errCount++ + if errCount >= 20_000 { + // Stop both producing and consuming. + cancel() + break Setup + } + case <-time.After(5 * time.Second): + t.Fatalf("Timed out waiting for limits error") + } + } + + // Both goroutines should be exiting now.. + wg.Wait() + + // Check messages that ought to have been dropped. + for subject := range receivedMap { + found, ok := shouldDrop[subject] + if ok { + t.Errorf("Should have dropped message published on %q since got error: %v", subject, found) + } + } + + getStreamDetails := func(t *testing.T, srv *Server) *StreamDetail { + t.Helper() + jsz, err := srv.Jsz(&JSzOptions{Accounts: true, Streams: true, Consumer: true}) + require_NoError(t, err) + if len(jsz.AccountDetails) > 0 && len(jsz.AccountDetails[0].Streams) > 0 { + details := jsz.AccountDetails[0] + stream := details.Streams[0] + return &stream + } + t.Error("Could not find account details") + return nil + } + checkState := func(t *testing.T) error { + t.Helper() + + leaderSrv := c.streamLeader("js", "LTEST") + streamLeader := getStreamDetails(t, leaderSrv) + // t.Logf("Stream Leader: %+v", streamLeader.State) + errs := make([]error, 0) + for _, srv := range c.servers { + if srv == leaderSrv { + // Skip self + continue + } + stream := getStreamDetails(t, srv) + if stream.State.Msgs != streamLeader.State.Msgs { + err := fmt.Errorf("Leader %v has %d messages, Follower %v has %d messages", + stream.Cluster.Leader, streamLeader.State.Msgs, + srv.Name(), stream.State.Msgs, + ) + errs = append(errs, err) + } + } + if len(errs) > 0 { + return errors.Join(errs...) + } + return nil + } + + // Confirm state of the leader. + leaderSrv := c.streamLeader("js", "LTEST") + streamLeader := getStreamDetails(t, leaderSrv) + if streamLeader.State.Msgs != received { + t.Errorf("Leader %v has %d messages stored but %d messages were received (delta: %d)", + leaderSrv.Name(), streamLeader.State.Msgs, received, received-streamLeader.State.Msgs) + } + cinfo, err := psub.ConsumerInfo() + require_NoError(t, err) + if received != cinfo.Delivered.Consumer { + t.Errorf("Unexpected consumer sequence. Got: %v, expected: %v", + cinfo.Delivered.Consumer, received) + } + + // Check whether there was a drift among the leader and followers. + var ( + lastErr error + attempts int + ) +Check: + for range time.NewTicker(1 * time.Second).C { + lastErr = checkState(t) + if attempts > 5 { + break Check + } + attempts++ + } + + // Read the stream + psub2, err := cjs.PullSubscribe("messages.*", "") + require_NoError(t, err) + +Consume2: + for { + msgs, err := psub2.Fetch(100) + if err != nil { + continue + } + for _, msg := range msgs { + msg.Ack() + + meta, _ := msg.Metadata() + if meta.NumPending == 0 { + break Consume2 + } + } + } + + cinfo2, err := psub2.ConsumerInfo() + require_NoError(t, err) + + a := cinfo.Delivered.Consumer + b := cinfo2.Delivered.Consumer + if a != b { + t.Errorf("Consumers to same stream are at different sequences: %d vs %d", a, b) + } + + // Test is done but replicas were in sync so can stop testing at this point. + if lastErr == nil { + return + } + + // Now we will cause a few step downs while out of sync to get different results. + t.Errorf("Replicas are out of sync:\n%v", lastErr) + + stepDown := func() { + _, err = nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "LTEST"), nil, time.Second) + } + // Check StreamInfo in this state then trigger a few step downs. + var prevLeaderMsgs uint64 + leaderSrv = c.streamLeader("js", "LTEST") + sinfo, err := js.StreamInfo("LTEST") + prevLeaderMsgs = sinfo.State.Msgs + for i := 0; i < 10; i++ { + stepDown() + time.Sleep(2 * time.Second) + + leaderSrv = c.streamLeader("js", "LTEST") + sinfo, err = js.StreamInfo("LTEST") + if err != nil { + t.Logf("Error: %v", err) + continue + } + if leaderSrv != nil && sinfo != nil { + t.Logf("When leader is %v, Messages: %d", leaderSrv.Name(), sinfo.State.Msgs) + + // Leave the leader as the replica with less messages that was out of sync. + if prevLeaderMsgs > sinfo.State.Msgs { + break + } + } + } + t.Logf("Changed to use leader %v which has %d messages", leaderSrv.Name(), sinfo.State.Msgs) + + // Read the stream again + psub3, err := cjs.PullSubscribe("messages.*", "") + require_NoError(t, err) + +Consume3: + for { + msgs, err := psub3.Fetch(100) + if err != nil { + continue + } + for _, msg := range msgs { + msg.Ack() + + meta, _ := msg.Metadata() + if meta.NumPending == 0 { + break Consume3 + } + } + } + + cinfo3, err := psub3.ConsumerInfo() + require_NoError(t, err) + + // Compare against consumer that was created before resource limits error + // with one created before the step down. + a = cinfo.Delivered.Consumer + b = cinfo2.Delivered.Consumer + if a != b { + t.Errorf("Consumers to same stream are at different sequences: %d vs %d", a, b) + } + + // Compare against consumer that was created before resource limits error + // with one created AFTER the step down. + a = cinfo.Delivered.Consumer + b = cinfo3.Delivered.Consumer + if a != b { + t.Errorf("Consumers to same stream are at different sequences: %d vs %d", a, b) + } + + // Compare consumers created after the resource limits error. + a = cinfo2.Delivered.Consumer + b = cinfo3.Delivered.Consumer + if a != b { + t.Errorf("Consumers to same stream are at different sequences: %d vs %d", a, b) + } +} diff --git a/server/jwt_test.go b/server/jwt_test.go index e5bbd223dcc..28314150f6f 100644 --- a/server/jwt_test.go +++ b/server/jwt_test.go @@ -5467,11 +5467,8 @@ func TestJWTJetStreamTiers(t *testing.T) { require_Error(t, err) require_Equal(t, err.Error(), "nats: maximum consumers limit reached") _, err = js.Publish("testR1-3", msg[:]) - require_NoError(t, err) - _, err = js.Publish("testR1-3", []byte("1")) require_Error(t, err) require_Equal(t, err.Error(), "nats: resource limits exceeded for account") - } func TestJWTJetStreamMaxAckPending(t *testing.T) { @@ -5629,8 +5626,6 @@ func TestJWTJetStreamMaxStreamBytes(t *testing.T) { require_NoError(t, err) // test if we can push more messages into the stream - _, err = js.Publish("baz", msg[:]) - require_NoError(t, err) _, err = js.Publish("baz", msg[:]) // exceeds max stream bytes require_Error(t, err) require_Equal(t, err.Error(), "nats: resource limits exceeded for account") diff --git a/server/stream.go b/server/stream.go index 35361145729..8e1d75682a7 100644 --- a/server/stream.go +++ b/server/stream.go @@ -4346,10 +4346,10 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, // Process additional msg headers if still present. var msgId string var rollupSub, rollupAll bool + isClustered := mset.isClustered() if len(hdr) > 0 { outq := mset.outq - isClustered := mset.isClustered() // Certain checks have already been performed if in clustered mode, so only check if not. // Note, for cluster mode but with message tracing (without message delivery), we need @@ -4598,6 +4598,22 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, } } + // If clustered this was already checked and we do not want to check here and possibly introduce skew. + if !isClustered { + if exceeded, err := jsa.wouldExceedLimits(stype, tierName, mset.cfg.Replicas, subject, hdr, msg); exceeded { + if err == nil { + err = NewJSAccountResourcesExceededError() + } + s.RateLimitWarnf("JetStream resource limits exceeded for account: %q", accName) + if canRespond { + resp.PubAck = &PubAck{Stream: name} + resp.Error = err + response, _ = json.Marshal(resp) + mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, response, nil, 0)) + } + } + } + // Store actual msg. if lseq == 0 && ts == 0 { seq, ts, err = store.StoreMsg(subject, hdr, msg) @@ -4639,28 +4655,6 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, return err } - if exceeded, apiErr := jsa.limitsExceeded(stype, tierName, mset.cfg.Replicas); exceeded { - s.RateLimitWarnf("JetStream resource limits exceeded for account: %q", accName) - if canRespond { - resp.PubAck = &PubAck{Stream: name} - if apiErr == nil { - resp.Error = NewJSAccountResourcesExceededError() - } else { - resp.Error = apiErr - } - response, _ = json.Marshal(resp) - mset.outq.sendMsg(reply, response) - } - // If we did not succeed put those values back. - var state StreamState - mset.store.FastState(&state) - mset.lseq = state.LastSeq - mset.lmsgId = olmsgId - mset.mu.Unlock() - store.RemoveMsg(seq) - return nil - } - // If we have a msgId make sure to save. if msgId != _EMPTY_ { mset.storeMsgIdLocked(&ddentry{msgId, seq, ts})