diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 3a8e6cedd194..77c7f427cee3 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -235,6 +235,7 @@ otherwise no tag is added. {issue}42208[42208] {pull}42403[42403] *Heartbeat* +- Added maintenance windows support for Heartbeat. {pull}41508[41508] *Metricbeat* diff --git a/NOTICE.txt b/NOTICE.txt index 53e1dc6794bb..4459af37b0b7 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -22813,6 +22813,37 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +-------------------------------------------------------------------------------- +Dependency : github.com/teambition/rrule-go +Version: v1.8.2 +Licence type (autodetected): MIT +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/github.com/teambition/rrule-go@v1.8.2/LICENSE: + +MIT License + +Copyright (c) 2017-2023 Teambition + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + + -------------------------------------------------------------------------------- Dependency : github.com/tklauser/go-sysconf Version: v0.3.12 diff --git a/go.mod b/go.mod index a2ca86b6a126..0208a5491c05 100644 --- a/go.mod +++ b/go.mod @@ -209,6 +209,7 @@ require ( github.com/pkg/xattr v0.4.9 github.com/prometheus/prometheus v0.300.1 github.com/shirou/gopsutil/v4 v4.25.4 + github.com/teambition/rrule-go v1.8.2 github.com/tklauser/go-sysconf v0.3.12 github.com/xdg-go/scram v1.1.2 github.com/zyedidia/generic v1.2.1 diff --git a/go.sum b/go.sum index 65e8fdfa7dd2..7976c95307e0 100644 --- a/go.sum +++ b/go.sum @@ -966,6 +966,8 @@ github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/teambition/rrule-go v1.8.2 h1:lIjpjvWTj9fFUZCmuoVDrKVOtdiyzbzc93qTmRVe/J8= +github.com/teambition/rrule-go v1.8.2/go.mod h1:Ieq5AbrKGciP1V//Wq8ktsTXwSwJHDD5mD/wLBGl3p4= github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY= github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= diff --git a/heartbeat/docs/monitors/monitor-common-options.asciidoc b/heartbeat/docs/monitors/monitor-common-options.asciidoc index 869c6a093462..d5bd391b75aa 100644 --- a/heartbeat/docs/monitors/monitor-common-options.asciidoc +++ b/heartbeat/docs/monitors/monitor-common-options.asciidoc @@ -146,6 +146,66 @@ Example: ``` +[float] +[[monitor-maintenance-windows]] +=== `maintenance_windows` + +Use the `maintenance_windows` option to define recurring time periods when a heartbeat monitor should be paused. This feature is implemented via `rrule`, allowing flexible scheduling of maintenance windows. + +The `maintenance_windows` option supports the following top-level fields: + +* `freq`: Specifies the frequency of the maintenance window. Supported values are `daily`, `weekly`, `monthly`, and `yearly`. +* `dtstart`: The start date and time for the first occurrence of the maintenance window in ISO 8601 format. This value cannot be older than two years to prevent excessive `rrule` iterations. +* `interval`: The interval at which the rule repeats. For example, an interval of `1` with `freq: weekly` means the maintenance window occurs every week. +* `byweekday`: (Optional) Specifies the days of the week when the maintenance window occurs. Accepts values like `MO`, `TU`, `WE`, `TH`, `FR`, `SA`, `SU`. +* `byhour`: (Optional) Specifies the hour(s) of the day when the maintenance window should trigger. +* `byminute`: (Optional) Specifies the minute(s) of the hour when the maintenance window should trigger. +* `bysecond`: (Optional) Specifies the second(s) of the minute when the maintenance window should trigger. +* `byeaster`: (Optional) Specifies the offset from Easter Sunday for the maintenance window. +* `bysetpos`: (Optional) Specifies the nth occurrence in the set of recurrence values. +* `bymonth`: (Optional) Specifies the month(s) when the maintenance window should trigger. +* `byweekno`: (Optional) Specifies the week(s) of the year when the maintenance window should trigger. +* `byyearday`: (Optional) Specifies the day(s) of the year when the maintenance window should trigger. +* `bymonthday`: (Optional) Specifies the day(s) of the month when the maintenance window should trigger. +* `wkst`: (Optional) Specifies the starting day of the week (e.g., `MO` for Monday). +* `duration`: The duration of each maintenance window in milliseconds. +* `count`: (Optional) The number of times the maintenance window should occur before stopping. + +Example: + +```yaml +- type: http + # ID used to uniquely identify this monitor + id: my-monitor + # List of URLs to query + urls: ["http://localhost:9200"] + # Define maintenance windows + maintenance_windows: + - freq: daily + dtstart: "2024-11-04T01:00:00.000Z" + interval: 1 + byweekday: [MO, TU, WE, TH, FR] + byhour: [1, 2] + byminute: [0, 30] + bysecond: [0] + bymonth: [1, 6, 12] + byweekno: [10, 20, 30] + byyearday: [100, 200, 300] + bymonthday: [1, 15, 31] + wkst: MO + duration: 3600000 # 1 hour in milliseconds + count: 10 +``` + +This configuration pauses the monitor every weekday at 01:00 and 02:00 UTC for one hour, repeating for ten occurrences. + +**Limitations:** + +- Only `daily`, `weekly`, `monthly`, and `yearly` frequencies are supported. +- `dtstart` must not be older than two years to limit the number of `rrule` iterations. + + + [float] [[monitor-fields]] ==== `fields` diff --git a/heartbeat/monitors/maintwin/maintwin.go b/heartbeat/monitors/maintwin/maintwin.go new file mode 100644 index 000000000000..515277422fcc --- /dev/null +++ b/heartbeat/monitors/maintwin/maintwin.go @@ -0,0 +1,119 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package maintwin + +import ( + "fmt" + "strings" + "time" + + "github.com/teambition/rrule-go" +) + +var weekdayLookup = map[string]rrule.Weekday{ + "MO": rrule.MO, "TU": rrule.TU, "WE": rrule.WE, "TH": rrule.TH, "FR": rrule.FR, "SA": rrule.SA, "SU": rrule.SU, +} + +type MaintWin struct { + Freq string `config:"freq" validate:"required"` + Dtstart string `config:"dtstart" validate:"required"` + Interval int `config:"interval"` + Duration time.Duration `config:"duration" validate:"required"` + Wkst rrule.Weekday `config:"wkst"` + Count int `config:"count"` + Bysetpos []int `config:"bysetpos"` + Bymonth []int `config:"bymonth"` + Bymonthday []int `config:"bymonthday"` + Byyearday []int `config:"byyearday"` + Byweekno []int `config:"byweekno"` + Byweekday []string `config:"byweekday"` + Byhour []int `config:"byhour"` + Byminute []int `config:"byminute"` + Bysecond []int `config:"bysecond"` + Byeaster []int `config:"byeaster"` +} + +func (mw *MaintWin) Parse(validateDtStart bool) (r *rrule.RRule, err error) { + + // validate the frequency, we don't support less than daily + freq, err := rrule.StrToFreq(strings.ToUpper(mw.Freq)) + if err != nil || freq > rrule.DAILY { + return nil, fmt.Errorf("invalid frequency %s: only yearly, monthly, weekly, and daily are supported", mw.Freq) + } + + dtstart, err := time.Parse(time.RFC3339, mw.Dtstart) + if err != nil { + return nil, err + } + + // validate DTSTART and make sure it's not older than 2 years + if dtstart.Before(time.Now().AddDate(-2, 0, 0)) && validateDtStart { + return nil, fmt.Errorf( + "invalid dtstart: %s is more than 2 years in the past. "+ + "To prevent excessive iterations, please use a more recent date", + dtstart.Format(time.RFC3339), + ) + } + + // Convert the string weekdays to rrule.Weekday + weekdays := []rrule.Weekday{} + for _, wd := range mw.Byweekday { + if weekday, exists := weekdayLookup[wd]; exists { + weekdays = append(weekdays, weekday) + } + } + + dtstart = dtstart.UTC() + + r, err = rrule.NewRRule(rrule.ROption{ + Freq: freq, + Count: mw.Count, + Dtstart: dtstart, + Interval: mw.Interval, + Byweekday: weekdays, + Byhour: mw.Byhour, + Byminute: mw.Byminute, + Bysecond: mw.Bysecond, + Byeaster: mw.Byeaster, + Bysetpos: mw.Bysetpos, + Bymonth: mw.Bymonth, + Byweekno: mw.Byweekno, + Byyearday: mw.Byyearday, + Bymonthday: mw.Bymonthday, + Wkst: mw.Wkst, + }) + if err != nil { + return nil, err + } + + return r, nil +} + +type ParsedMaintWin struct { + Rule *rrule.RRule + Duration time.Duration +} + +func (pmw ParsedMaintWin) IsActive(tOrig time.Time) bool { + if pmw.Rule == nil { + return false + } + tOrig = tOrig.UTC() + window := pmw.Rule.Before(tOrig, true) + return !window.IsZero() && tOrig.Before(window.Add(pmw.Duration)) +} diff --git a/heartbeat/monitors/maintwin/maintwin_test.go b/heartbeat/monitors/maintwin/maintwin_test.go new file mode 100644 index 000000000000..32cf47e68022 --- /dev/null +++ b/heartbeat/monitors/maintwin/maintwin_test.go @@ -0,0 +1,187 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package maintwin + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMaintWin(t *testing.T) { + cases := []struct { + name string + mw MaintWin + positiveMatches []string + negativeMatches []string + }{ + { + "Every sunday at midnight to 1 AM", + MaintWin{ + Freq: "daily", + Dtstart: time.Now().Format(time.RFC3339), + Duration: mustParseDuration("2h"), + Byweekday: []string{"SU", "MO", "TU", "WE", "TH", "FR", "SA"}, + Count: 10, + }, + // add 30 minutes, 1 hour, 1 hour 30 minutes to the start time + []string{time.Now().Add(30 * time.Minute).Format(time.RFC3339), time.Now().Add(60 * time.Minute).Format(time.RFC3339), time.Now().Add(90 * time.Minute).Format(time.RFC3339)}, + []string{time.Now().Add(180 * time.Minute).Format(time.RFC3339), time.Now().Add(540 * time.Minute).Format(time.RFC3339)}, + }, + + { + name: "Daily maintenance window for 2 hours", + mw: MaintWin{ + Freq: "daily", + Dtstart: "2025-02-06T21:00:00Z", + Duration: mustParseDuration("2h"), + }, + positiveMatches: []string{"2025-02-06T21:30:00Z", "2025-02-06T22:45:00Z"}, + negativeMatches: []string{"2025-02-06T23:01:00Z", "2025-02-07T00:00:00Z"}, + }, + + { + name: "Monthly maintenance window on the 1st", + mw: MaintWin{ + Freq: "monthly", + Dtstart: "2025-02-01T10:00:00Z", + Duration: mustParseDuration("2h"), + Bymonthday: []int{1}, + }, + positiveMatches: []string{"2025-03-01T10:30:00Z", "2025-04-01T11:45:00Z"}, + negativeMatches: []string{"2025-02-02T10:30:00Z", "2025-02-01T12:01:00Z"}, + }, + + { + name: "Weekly on Monday and Wednesday from 8 AM to 10 AM", + mw: MaintWin{ + Freq: "weekly", + Dtstart: "2025-02-03T08:00:00Z", + Duration: mustParseDuration("2h"), + Byweekday: []string{"MO", "WE"}, + }, + positiveMatches: []string{"2025-02-10T09:30:00Z", "2025-02-12T08:15:00Z"}, + negativeMatches: []string{"2025-02-10T10:30:00Z", "2025-02-11T09:30:00Z"}, + }, + + { + name: "First Friday of every month", + mw: MaintWin{ + Freq: "monthly", + Dtstart: "2025-02-07T12:00:00Z", + Duration: mustParseDuration("2h"), + Byweekday: []string{"FR"}, + Bysetpos: []int{1}, // First Friday of the month + }, + positiveMatches: []string{"2025-03-07T12:30:00Z"}, + negativeMatches: []string{"2025-02-14T12:30:00Z", "2025-04-14T13:00:00Z"}, + }, + + { + name: "Every Saturday and Sunday from 5 PM to 8 PM", + mw: MaintWin{ + Freq: "weekly", + Dtstart: "2025-02-08T17:00:00Z", + Duration: mustParseDuration("3h"), + Byweekday: []string{"SA", "SU"}, + }, + positiveMatches: []string{"2025-02-09T18:30:00Z", "2025-02-15T19:00:00Z"}, + negativeMatches: []string{"2025-02-09T20:30:00Z", "2025-02-10T17:30:00Z"}, + }, + + { + name: "Monthly on the 15th from 6 AM to 9 AM", + mw: MaintWin{ + Freq: "monthly", + Dtstart: "2025-02-15T06:00:00Z", + Duration: mustParseDuration("3h"), + Bymonthday: []int{15}, + }, + positiveMatches: []string{"2025-03-15T07:30:00Z", "2025-04-15T08:45:00Z"}, + negativeMatches: []string{"2025-02-16T07:30:00Z", "2025-02-15T09:30:00Z"}, + }, + + { + name: "Yearly maintenance on Jan 1 from Midnight to 3 AM", + mw: MaintWin{ + Freq: "yearly", + Dtstart: "2025-01-01T00:00:00Z", + Duration: mustParseDuration("3h"), + Bymonthday: []int{1}, + }, + positiveMatches: []string{"2026-01-01T01:30:00Z", "2027-01-01T02:45:00Z"}, + negativeMatches: []string{"2025-01-02T01:30:00Z", "2025-01-01T03:30:00Z"}, + }, + + { + name: "Every other day for 4 hours", + mw: MaintWin{ + Freq: "daily", + Dtstart: "2025-02-06T08:00:00Z", + Duration: mustParseDuration("4h"), + Interval: 2, // Every other day + Count: 10, + }, + positiveMatches: []string{"2025-02-08T09:30:00Z", "2025-02-10T11:00:00Z"}, + negativeMatches: []string{"2025-02-07T09:30:00Z", "2025-02-06T13:00:00Z"}, + }, + { + name: "Every day", + mw: MaintWin{ + Freq: "daily", + Dtstart: "2005-02-06T08:00:00Z", + Duration: mustParseDuration("1h"), + }, + positiveMatches: []string{"2025-02-08T08:30:00Z"}, + negativeMatches: []string{"2025-02-07T09:30:00Z"}, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + r, err := c.mw.Parse(false) + require.NoError(t, err) + pmw := ParsedMaintWin{Rule: r, Duration: c.mw.Duration} + for _, m := range c.positiveMatches { + t.Run(fmt.Sprintf("does match %s", m), func(t *testing.T) { + pt, err := time.Parse(time.RFC3339, m) + require.NoError(t, err) + assert.True(t, pmw.IsActive(pt.UTC())) + }) + } + for _, m := range c.negativeMatches { + t.Run(fmt.Sprintf("does not match %s", m), func(t *testing.T) { + pt, err := time.Parse(time.RFC3339, m) + require.NoError(t, err) + assert.False(t, pmw.IsActive(pt)) + }) + } + }) + } +} + +func mustParseDuration(s string) time.Duration { + d, err := time.ParseDuration(s) + if err != nil { + panic(fmt.Sprintf("could not parse duration %s: %s", s, err)) + } + return d +} diff --git a/heartbeat/monitors/stdfields/stdfields.go b/heartbeat/monitors/stdfields/stdfields.go index ae721a746111..e83e23a7f0d5 100644 --- a/heartbeat/monitors/stdfields/stdfields.go +++ b/heartbeat/monitors/stdfields/stdfields.go @@ -22,6 +22,7 @@ import ( "time" hbconfig "github.com/elastic/beats/v7/heartbeat/config" + "github.com/elastic/beats/v7/heartbeat/monitors/maintwin" "github.com/elastic/beats/v7/heartbeat/scheduler/schedule" "github.com/elastic/elastic-agent-libs/config" ) @@ -32,15 +33,17 @@ type ServiceFields struct { // StdMonitorFields represents the generic configuration options around a monitor plugin. type StdMonitorFields struct { - ID string `config:"id"` - Name string `config:"name"` - Type string `config:"type" validate:"required"` - Schedule *schedule.Schedule `config:"schedule" validate:"required"` - Timeout time.Duration `config:"timeout"` - Service ServiceFields `config:"service"` - Origin string `config:"origin"` - LegacyServiceName string `config:"service_name"` - MaxAttempts uint16 `config:"max_attempts"` + ID string `config:"id"` + Name string `config:"name"` + Type string `config:"type" validate:"required"` + Schedule *schedule.Schedule `config:"schedule" validate:"required"` + MaintenanceWindows []maintwin.MaintWin `config:"maintenance_windows" ` + ParsedMainteWin []maintwin.ParsedMaintWin + Timeout time.Duration `config:"timeout"` + Service ServiceFields `config:"service"` + Origin string `config:"origin"` + LegacyServiceName string `config:"service_name"` + MaxAttempts uint16 `config:"max_attempts"` // Used by zip_url and local monitors // kibana originating monitors only run one journey at a time // and just use the `fields` syntax / manually set monitor IDs @@ -77,5 +80,13 @@ func ConfigToStdMonitorFields(conf *config.C) (StdMonitorFields, error) { sFields.IsLegacyBrowserSource = true } + for _, mw := range sFields.MaintenanceWindows { + parsed, err := mw.Parse(true) + if err != nil { + return StdMonitorFields{}, fmt.Errorf("could not parse maintenance window for monitor (id:%s name:%s): %w", sFields.ID, sFields.Name, err) + } + sFields.ParsedMainteWin = append(sFields.ParsedMainteWin, maintwin.ParsedMaintWin{Rule: parsed, Duration: mw.Duration}) + } + return sFields, nil } diff --git a/heartbeat/monitors/task.go b/heartbeat/monitors/task.go index a655e1d15467..44fa6dfa3a47 100644 --- a/heartbeat/monitors/task.go +++ b/heartbeat/monitors/task.go @@ -84,7 +84,7 @@ func (t *configuredJob) Start(pubClient beat.Client) { return } - t.cancelFn, err = t.monitor.addTask(t.config.Schedule, t.monitor.stdFields.ID, t.makeSchedulerTaskFunc(), t.config.Type) + t.cancelFn, err = t.monitor.addTask(t.config.Schedule, t.monitor.stdFields.ParsedMainteWin, t.monitor.stdFields.ID, t.makeSchedulerTaskFunc(), t.config.Type) if err != nil { logp.L().Infof("could not start monitor: %v", err) } diff --git a/heartbeat/scheduler/scheduler.go b/heartbeat/scheduler/scheduler.go index eb9ba2f463ce..354d81a82a27 100644 --- a/heartbeat/scheduler/scheduler.go +++ b/heartbeat/scheduler/scheduler.go @@ -28,6 +28,7 @@ import ( "golang.org/x/sync/semaphore" "github.com/elastic/beats/v7/heartbeat/config" + "github.com/elastic/beats/v7/heartbeat/monitors/maintwin" "github.com/elastic/beats/v7/heartbeat/scheduler/timerqueue" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/monitoring" @@ -161,11 +162,11 @@ func (s *Scheduler) WaitForRunOnce() { // has already stopped. var ErrAlreadyStopped = errors.New("attempted to add job to already stopped scheduler") -type AddTask func(sched Schedule, id string, entrypoint TaskFunc, jobType string) (removeFn context.CancelFunc, err error) +type AddTask func(sched Schedule, pmws []maintwin.ParsedMaintWin, id string, entrypoint TaskFunc, jobType string) (removeFn context.CancelFunc, err error) // Add adds the given TaskFunc to the current scheduler. Will return an error if the scheduler // is done. -func (s *Scheduler) Add(sched Schedule, id string, entrypoint TaskFunc, jobType string) (removeFn context.CancelFunc, err error) { +func (s *Scheduler) Add(sched Schedule, pmws []maintwin.ParsedMaintWin, id string, entrypoint TaskFunc, jobType string) (removeFn context.CancelFunc, err error) { if errors.Is(s.ctx.Err(), context.Canceled) { return nil, ErrAlreadyStopped } @@ -178,7 +179,7 @@ func (s *Scheduler) Add(sched Schedule, id string, entrypoint TaskFunc, jobType var taskFn timerqueue.TimerTaskFn - taskFn = func(_ time.Time) { + taskFn = func(now time.Time) { select { case <-jobCtx.Done(): debugf("Job '%v' canceled", id) @@ -189,7 +190,22 @@ func (s *Scheduler) Add(sched Schedule, id string, entrypoint TaskFunc, jobType debugf("Job '%s' started", id) sj := newSchedJob(jobCtx, s, id, jobType, entrypoint) - lastRanAt := sj.run() + var activeMainWin *maintwin.ParsedMaintWin + for _, pmw := range pmws { + if pmw.IsActive(now) { + pmwCopy := pmw + activeMainWin = &pmwCopy + break + } + } + + var lastRanAt time.Time + if activeMainWin == nil { + lastRanAt = sj.run() + } else { + logp.L().Infof("Job '%s' is in maintenance window '%s' , skipping", id, activeMainWin.Rule) + lastRanAt = now + } s.stats.activeJobs.Dec() if s.runOnce { diff --git a/heartbeat/scheduler/scheduler_test.go b/heartbeat/scheduler/scheduler_test.go index a133f8cd8e48..6109d88462d3 100644 --- a/heartbeat/scheduler/scheduler_test.go +++ b/heartbeat/scheduler/scheduler_test.go @@ -30,6 +30,7 @@ import ( "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/heartbeat/config" + "github.com/elastic/beats/v7/heartbeat/monitors/maintwin" "github.com/elastic/elastic-agent-libs/monitoring" ) @@ -83,10 +84,13 @@ func TestSchedulerRun(t *testing.T) { s := Create(10, monitoring.NewRegistry(), tarawaTime(), nil, false) defer s.Stop() + mainWin := maintwin.ParsedMaintWin{} + mainWins := []maintwin.ParsedMaintWin{mainWin} + executed := make(chan string) initialEvents := uint32(10) - _, err := s.Add(testSchedule{0}, "add", testTaskTimes(initialEvents, func(_ context.Context) []TaskFunc { + _, err := s.Add(testSchedule{0}, mainWins, "add", testTaskTimes(initialEvents, func(_ context.Context) []TaskFunc { executed <- "initial" cont := func(_ context.Context) []TaskFunc { executed <- "initialCont" @@ -109,13 +113,13 @@ func TestSchedulerRun(t *testing.T) { } // Attempt to execute this twice to see if remove() had any effect removeMtx.Lock() - remove, err = s.Add(testSchedule{}, "removed", testTaskTimes(removedEvents+1, testFn), "http") + remove, err = s.Add(testSchedule{}, mainWins, "removed", testTaskTimes(removedEvents+1, testFn), "http") require.NoError(t, err) require.NotNil(t, remove) removeMtx.Unlock() postRemoveEvents := uint32(10) - _, err = s.Add(testSchedule{}, "postRemove", testTaskTimes(postRemoveEvents, func(_ context.Context) []TaskFunc { + _, err = s.Add(testSchedule{}, mainWins, "postRemove", testTaskTimes(postRemoveEvents, func(_ context.Context) []TaskFunc { executed <- "postRemove" cont := func(_ context.Context) []TaskFunc { executed <- "postRemoveCont" @@ -130,6 +134,7 @@ func TestSchedulerRun(t *testing.T) { // Otherwise, we might only do 1 preAdd and 1 postRemove event // We double the number of pre/post add events to account for their continuations totalExpected := initialEvents*2 + removedEvents + postRemoveEvents*2 + //nolint:gosec // G115 can be ignored as this input is static for uint32(len(received)) < totalExpected { select { case got := <-executed: @@ -159,9 +164,12 @@ func TestScheduler_WaitForRunOnce(t *testing.T) { defer s.Stop() + mainWin := maintwin.ParsedMaintWin{} + mainWins := []maintwin.ParsedMaintWin{mainWin} + executed := new(uint32) - _, err := s.Add(testSchedule{0}, "runOnce", func(_ context.Context) []TaskFunc { + _, err := s.Add(testSchedule{0}, mainWins, "runOnce", func(_ context.Context) []TaskFunc { cont := func(_ context.Context) []TaskFunc { // Make sure we actually wait for the task! time.Sleep(time.Millisecond * 250) @@ -180,10 +188,12 @@ func TestScheduler_Stop(t *testing.T) { s := Create(10, monitoring.NewRegistry(), tarawaTime(), nil, false) executed := make(chan struct{}) + mainWin := maintwin.ParsedMaintWin{} + mainWins := []maintwin.ParsedMaintWin{mainWin} s.Stop() - _, err := s.Add(testSchedule{}, "testPostStop", testTaskTimes(1, func(_ context.Context) []TaskFunc { + _, err := s.Add(testSchedule{}, mainWins, "testPostStop", testTaskTimes(1, func(_ context.Context) []TaskFunc { executed <- struct{}{} return nil }), "http") @@ -279,10 +289,12 @@ func BenchmarkScheduler(b *testing.B) { s := Create(0, monitoring.NewRegistry(), tarawaTime(), nil, false) sched := testSchedule{0} + mainWin := maintwin.ParsedMaintWin{} + mainWins := []maintwin.ParsedMaintWin{mainWin} executed := make(chan struct{}) for i := 0; i < 1024; i++ { - _, err := s.Add(sched, "testPostStop", func(_ context.Context) []TaskFunc { + _, err := s.Add(sched, mainWins, "testPostStop", func(_ context.Context) []TaskFunc { executed <- struct{}{} return nil }, "http")