Skip to content
This repository has been archived by the owner on Dec 6, 2023. It is now read-only.

Commit

Permalink
Server: catch up on missed ticks (#218)
Browse files Browse the repository at this point in the history
* server: warn on missed ticks

On heavy load, or when upgrading the server, the plugin might not
tick every seconds. To better quantify this, log when some ticks
are missed.

* server: catch up on missed ticks

Catches up ticks up to 10 minutes (as longer may overload the server
when starting up after being stopped a very long time).

If it takes longer to catch up for ticks that a tick, the missed
ticks will be caught up on the next turn.
  • Loading branch information
kemenaran authored Mar 14, 2023
1 parent 8d8e425 commit 1f98568
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 7 deletions.
60 changes: 59 additions & 1 deletion server/reminder.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,42 @@ type ReminderRequest struct {
}

func (p *Plugin) TriggerReminders() {
tickAt := time.Now().UTC().Round(time.Second)
lastTickAt := p.getLastTickTimeWithDefault(tickAt)

// Before handling more operations, save the updated LastTickAt time
p.setLastTickTime(tickAt)

// Catch up on missed ticks (if any)
tickDelta := tickAt.Sub(lastTickAt)
ticksMissed := tickDelta.Seconds() - 1
if ticksMissed > 0 {
oneSecond := time.Second
maxCatchupDuration, _ := time.ParseDuration("-10m")
catchupStart := lastTickAt.Add(oneSecond)
earliestCatchupStart := tickAt.Add(maxCatchupDuration)

if (catchupStart.Before(earliestCatchupStart)) {
catchupStart = earliestCatchupStart
p.API.LogInfo(fmt.Sprintf("Too many reminder ticks were missed: occurrences between %v and %v will be dropped.", lastTickAt, catchupStart))
}

p.API.LogDebug(fmt.Sprintf("Catching up on %v reminder tick(s)...", tickAt.Sub(catchupStart).Seconds()))
for tick := catchupStart; tick.Before(tickAt); tick = tick.Add(oneSecond) {
p.TriggerRemindersForTick(tick)
}
p.API.LogDebug("Caught up on missed reminder ticks.")
}

bytes, err := p.API.KVGet(string(fmt.Sprintf("%v", time.Now().UTC().Round(time.Second))))
// Trigger the actual tick
p.TriggerRemindersForTick(tickAt)
}

func (p *Plugin) TriggerRemindersForTick(tickAt time.Time) {
p.API.LogDebug("Trigger reminders for " + fmt.Sprintf("%v", tickAt))

// Look up reminders to be triggered for the tick time
bytes, err := p.API.KVGet(string(fmt.Sprintf("%v", tickAt)))
if err != nil {
p.API.LogError("failed KVGet %s", err)
}
Expand Down Expand Up @@ -584,3 +617,28 @@ func (p *Plugin) findReminder(reminders []Reminder, occurrence Occurrence) Remin
}
return Reminder{}
}

func (p *Plugin) getLastTickTimeWithDefault(defaultValue time.Time) time.Time {
bytes, err := p.API.KVGet("LastTickAt")
if err != nil {
p.API.LogInfo(fmt.Sprintf("Failed to read LastTickAt (%v); returning the default value", err))
return defaultValue
}
if bytes == nil {
p.API.LogDebug("LastTickAt is not set; returning the default value")
return defaultValue
}

lastTickAt, parseErr := time.Parse(time.RFC3339, string(bytes[:]))
if parseErr != nil {
p.API.LogInfo(fmt.Sprintf("Failed to parse LastTickAt value (%v); returning the default value", parseErr))
return defaultValue
}

return lastTickAt
}

func (p *Plugin) setLastTickTime(lastTickAt time.Time) {
serializedTime := lastTickAt.Format(time.RFC3339)
p.API.KVSet("LastTickAt", []byte(serializedTime))
}
62 changes: 56 additions & 6 deletions server/reminder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,55 @@ import (
)

func TestTriggerReminders(t *testing.T) {
testTime := time.Now().UTC().Round(time.Second)
serializedTestTime := []byte(testTime.Format(time.RFC3339))

t.Run("it triggers reminders scheduled for the current time", func(t *testing.T) {
oneSecondAgo, _ := time.ParseDuration("-1s")
lastTickAt := testTime.Add(oneSecondAgo)
serializedLastTickAt := []byte(lastTickAt.Format(time.RFC3339))

api := &plugintest.API{}
api.On("KVGet", string("LastTickAt")).Return(serializedLastTickAt, nil)
api.On("KVSet", string("LastTickAt"), serializedTestTime).Return(nil)
api.On("LogDebug", "Trigger reminders for " + fmt.Sprintf("%v", testTime))
api.On("KVGet", string(fmt.Sprintf("%v", testTime))).Return(nil, nil)
defer api.AssertExpectations(t)

p := &Plugin{}
p.API = api

p.TriggerReminders()
})

t.Run("when ticks have been missed, it triggers reminders for the missed ticks as well", func(t *testing.T) {
oneSecondsAgo, _ := time.ParseDuration("-1s")
twoSecondsAgo, _ := time.ParseDuration("-2s")
threeSecondsAgo, _ := time.ParseDuration("-3s")
lastTickAt := testTime.Add(threeSecondsAgo)
serializedLastTickAt := []byte(lastTickAt.Format(time.RFC3339))

api := &plugintest.API{}
api.On("KVGet", string("LastTickAt")).Return(serializedLastTickAt, nil)
api.On("KVSet", string("LastTickAt"), serializedTestTime).Return(nil)
api.On("LogDebug", "Catching up on 2 reminder tick(s)...")
api.On("LogDebug", "Trigger reminders for " + fmt.Sprintf("%v", testTime.Add(twoSecondsAgo)))
api.On("KVGet", string(fmt.Sprintf("%v", testTime.Add(twoSecondsAgo)))).Return(nil, nil)
api.On("LogDebug", "Trigger reminders for " + fmt.Sprintf("%v", testTime.Add(oneSecondsAgo)))
api.On("KVGet", string(fmt.Sprintf("%v", testTime.Add(oneSecondsAgo)))).Return(nil, nil)
api.On("LogDebug", "Caught up on missed reminder ticks.")
api.On("LogDebug", "Trigger reminders for " + fmt.Sprintf("%v", testTime))
api.On("KVGet", string(fmt.Sprintf("%v", testTime))).Return(nil, nil)
defer api.AssertExpectations(t)

p := &Plugin{}
p.API = api

p.TriggerReminders()
})
}

func TestTriggerRemindersForTick(t *testing.T) {

user := &model.User{
Email: "[email protected]",
Expand Down Expand Up @@ -82,13 +131,14 @@ func TestTriggerReminders(t *testing.T) {

stringOccurrences, _ := json.Marshal(occurrences)
api := &plugintest.API{}
api.On("LogDebug", mock.Anything, mock.Anything, mock.Anything).Maybe()
api.On("KVGet", string(fmt.Sprintf("%v", testTime))).Return(stringOccurrences, nil)
defer api.AssertExpectations(t)

p := &Plugin{}
p.API = api

p.TriggerReminders()
p.TriggerRemindersForTick(testTime)

})

Expand All @@ -102,7 +152,7 @@ func TestTriggerReminders(t *testing.T) {
p := &Plugin{}
p.API = api

p.TriggerReminders()
p.TriggerRemindersForTick(testTime)

})

Expand All @@ -117,7 +167,7 @@ func TestTriggerReminders(t *testing.T) {
p := &Plugin{}
p.API = api

p.TriggerReminders()
p.TriggerRemindersForTick(testTime)

})

Expand All @@ -132,7 +182,7 @@ func TestTriggerReminders(t *testing.T) {
p := &Plugin{}
p.API = api

p.TriggerReminders()
p.TriggerRemindersForTick(testTime)

})

Expand Down Expand Up @@ -178,7 +228,7 @@ func TestTriggerReminders(t *testing.T) {
p := &Plugin{}
p.API = api

p.TriggerReminders()
p.TriggerRemindersForTick(testTime)

})

Expand Down Expand Up @@ -222,7 +272,7 @@ func TestTriggerReminders(t *testing.T) {
p := &Plugin{}
p.API = api

p.TriggerReminders()
p.TriggerRemindersForTick(testTime)

})

Expand Down

0 comments on commit 1f98568

Please sign in to comment.