diff --git a/server/filestore.go b/server/filestore.go index 420e55bd759..9eba96f2572 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -6093,9 +6093,15 @@ func (fs *fileStore) runMsgScheduling() { fs.mu.Lock() defer fs.mu.Unlock() - if fs.scheduling == nil || fs.pmsgcb == nil { + // If scheduling is enabled, but handler isn't set up yet. Try again later. + if fs.scheduling == nil { + return + } + if fs.pmsgcb == nil { + fs.scheduling.resetTimer() return } + fs.scheduling.running = true scheduledMsgs := fs.scheduling.getScheduledMessages(func(seq uint64, smv *StoreMsg) *StoreMsg { sm, _ := fs.msgForSeqLocked(seq, smv, false) @@ -6109,9 +6115,8 @@ func (fs *fileStore) runMsgScheduling() { fs.mu.Lock() } - if fs.scheduling != nil { - fs.scheduling.resetTimer() - } + fs.scheduling.running, fs.scheduling.deadline = false, 0 + fs.scheduling.resetTimer() } // Lock should be held. diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 8e5c89a2d4a..028f1a5547e 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -22127,3 +22127,76 @@ func TestJetStreamMessageTTLNotExpiring(t *testing.T) { }) } } + +func TestJetStreamScheduledMessageNotTriggering(t *testing.T) { + for _, storageType := range []StorageType{FileStorage, MemoryStorage} { + t.Run(storageType.String(), func(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := jsStreamCreate(t, nc, &StreamConfig{ + Name: "TEST", + Subjects: []string{"foo.>"}, + Storage: storageType, + AllowMsgSchedules: true, + }) + require_NoError(t, err) + + delay := func(d time.Duration) string { + return fmt.Sprintf("@at %s", time.Now().Add(d).Format(time.RFC3339Nano)) + } + + // Triggers the schedule timer once, and needs to be reset to trigger earlier. + m := nats.NewMsg("foo.schedule.first") + m.Header.Set("Nats-Schedule", delay(time.Hour)) + m.Header.Set("Nats-Schedule-Target", "foo.msg") + _, err = js.PublishMsg(m) + require_NoError(t, err) + + // Storing messages with a schedule would continuously reset the timer. + var wg sync.WaitGroup + wg.Add(1) + defer wg.Wait() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + defer wg.Done() + var i int + for { + select { + case <-ctx.Done(): + return + case <-time.After(100 * time.Millisecond): + i++ + ms := nats.NewMsg(fmt.Sprintf("foo.schedule.%d", i)) + ms.Header.Set("Nats-Schedule", delay(time.Hour)) + ms.Header.Set("Nats-Schedule-Target", "foo.msg") + js.PublishMsg(ms) + } + } + }() + + // The message should be scheduled timely. + m = nats.NewMsg("foo.schedule.validate") + m.Header.Set("Nats-Schedule", delay(time.Second)) + m.Header.Set("Nats-Schedule-Target", "foo.msg") + _, err = js.PublishMsg(m) + require_NoError(t, err) + pubAck, err := js.PublishMsg(m) + require_NoError(t, err) + checkFor(t, 3*time.Second, 100*time.Millisecond, func() error { + _, err = js.GetMsg("TEST", pubAck.Sequence) + if err == nil { + return fmt.Errorf("message not removed yet") + } + if !errors.Is(err, nats.ErrMsgNotFound) { + return err + } + return nil + }) + }) + } +} diff --git a/server/memstore.go b/server/memstore.go index 92db6717d5e..ab33135f8b4 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -1326,9 +1326,15 @@ func (ms *memStore) runMsgScheduling() { ms.mu.Lock() defer ms.mu.Unlock() - if ms.scheduling == nil || ms.pmsgcb == nil { + // If scheduling is enabled, but handler isn't set up yet. Try again later. + if ms.scheduling == nil { return } + if ms.pmsgcb == nil { + ms.scheduling.resetTimer() + return + } + ms.scheduling.running = true scheduledMsgs := ms.scheduling.getScheduledMessages(func(seq uint64, smv *StoreMsg) *StoreMsg { sm, _ := ms.loadMsgLocked(seq, smv, false) @@ -1342,9 +1348,8 @@ func (ms *memStore) runMsgScheduling() { ms.mu.Lock() } - if ms.scheduling != nil { - ms.scheduling.resetTimer() - } + ms.scheduling.running, ms.scheduling.deadline = false, 0 + ms.scheduling.resetTimer() } // PurgeEx will remove messages based on subject filters, sequence and number of messages to keep. diff --git a/server/scheduler.go b/server/scheduler.go index fc626471b15..96e3e7d64c8 100644 --- a/server/scheduler.go +++ b/server/scheduler.go @@ -35,6 +35,8 @@ type MsgScheduling struct { run func() ttls *thw.HashWheel timer *time.Timer + running bool + deadline int64 schedules map[string]*MsgSchedule seqToSubj map[uint64]string inflight map[string]struct{} @@ -98,6 +100,12 @@ func (ms *MsgScheduling) clearInflight() { } func (ms *MsgScheduling) resetTimer() { + // If we're already scheduling messages, it will make sure to reset. + // Don't trigger again, as that could result in many expire goroutines. + if ms.running { + return + } + next := ms.ttls.GetNextExpiration(math.MaxInt64) if next == math.MaxInt64 { clearTimer(&ms.timer) @@ -111,6 +119,14 @@ func (ms *MsgScheduling) resetTimer() { fireIn = 250 * time.Millisecond } + // If we want to kick the timer to run later than what was assigned before, don't reset it. + // Otherwise, we could get in a situation where the timer is continuously reset, and it never runs. + deadline := time.Now().UnixNano() + fireIn.Nanoseconds() + if ms.deadline > 0 && deadline > ms.deadline { + return + } + + ms.deadline = deadline if ms.timer != nil { ms.timer.Reset(fireIn) } else {