From 1ee4a9e6c961c3d67a2242f58b8099b74fe00b63 Mon Sep 17 00:00:00 2001 From: Peter Takacs Date: Sat, 15 Oct 2022 03:43:35 +0200 Subject: [PATCH 1/2] Enhanced disk path scan --- internal/models/series_redis.go | 65 +++++++++++++++++++-------------- 1 file changed, 37 insertions(+), 28 deletions(-) diff --git a/internal/models/series_redis.go b/internal/models/series_redis.go index 2dd429e..62c4d19 100644 --- a/internal/models/series_redis.go +++ b/internal/models/series_redis.go @@ -41,47 +41,56 @@ func (rsr *RedisSeriesRepository) FindDiskPaths(probe Probe) ([]string, error) { conn := rsr.RedisPool.Get() defer conn.Close() - cursor := 0 - prefix := string(probe) + ":" + seriesDiskKeyPrefix + strconv.FormatInt(today().Unix(), 10) + ":" - var paths []string + timestamps := rsr.timestamps(Month) - for { - values, err := redis.Values( - conn.Do("SCAN", cursor, "MATCH", prefix+"*"), - ) - if err != nil { - return nil, err - } + for i := len(timestamps) - 1; i >= 0; i-- { + cursor := 0 + prefix := string(probe) + ":" + seriesDiskKeyPrefix + strconv.FormatInt(timestamps[i], 10) + ":" - cursor, err = redis.Int(values[0], nil) - if err != nil { - return nil, err + for { + values, err := redis.Values( + conn.Do("SCAN", cursor, "MATCH", prefix+"*"), + ) + if err != nil { + return nil, err + } + + cursor, err = redis.Int(values[0], nil) + if err != nil { + return nil, err + } + + current, err := redis.Strings(values[1], nil) + if err != nil { + return nil, err + } + + paths = append(paths, current...) + + if cursor == 0 { + break + } } - current, err := redis.Strings(values[1], nil) - if err != nil { - return nil, err + if len(paths) == 0 { + continue } - paths = append(paths, current...) + for key, value := range paths { + path, err := base64.StdEncoding.DecodeString(strings.ReplaceAll(value, prefix, "")) + if err != nil { + return nil, err + } - if cursor == 0 { - break + paths[key] = string(path) } - } - for key, value := range paths { - path, err := base64.StdEncoding.DecodeString(strings.ReplaceAll(value, prefix, "")) - if err != nil { - return nil, err - } + sort.Strings(paths) - paths[key] = string(path) + break } - sort.Strings(paths) - return paths, nil } From 008cf53531a5276f23fa4a6be4b09f7fd4103f14 Mon Sep 17 00:00:00 2001 From: Peter Takacs Date: Sat, 15 Oct 2022 03:44:38 +0200 Subject: [PATCH 2/2] Optimized heartbeat invocations --- internal/models/probe_redis.go | 31 +++++++-- internal/web/heartbeat.go | 121 ++++++++++++++++++++------------- 2 files changed, 98 insertions(+), 54 deletions(-) diff --git a/internal/models/probe_redis.go b/internal/models/probe_redis.go index 65161d1..35b3644 100644 --- a/internal/models/probe_redis.go +++ b/internal/models/probe_redis.go @@ -72,7 +72,7 @@ func (rpr *RedisProbeRepository) FindLatestValues(probe Probe, limit int) ([]int conn := rpr.RedisPool.Get() defer conn.Close() - var fields []string + days := map[string][]string{} now := time.Now() end := time.Date( @@ -89,14 +89,31 @@ func (rpr *RedisProbeRepository) FindLatestValues(probe Probe, limit int) ([]int start := end.Add(-time.Duration(limit-1) * time.Minute) for current := start; !current.After(end); current = current.Add(time.Minute) { - fields = append(fields, strconv.FormatInt(current.Unix(), 10)) + day := strconv.FormatInt(time.Date( + current.Year(), + current.Month(), + current.Day(), + 0, + 0, + 0, + 0, + now.Location(), + ).Unix(), 10) + + days[day] = append(days[day], strconv.FormatInt(current.Unix(), 10)) } - values, err := redis.Values( - conn.Do("HMGET", redis.Args{}.Add(string(probe)+":"+seriesCPUKeyPrefix+strconv.FormatInt(today().Unix(), 10)).AddFlat(fields)...), - ) - if err != nil { - return nil, nil, err + var values []interface{} + + for day, fields := range days { + dayValues, err := redis.Values( + conn.Do("HMGET", redis.Args{}.Add(string(probe)+":"+seriesCPUKeyPrefix+day).AddFlat(fields)...), + ) + if err != nil { + return nil, nil, err + } + + values = append(values, dayValues...) } return values, &start, nil diff --git a/internal/web/heartbeat.go b/internal/web/heartbeat.go index a079ae0..9a52d0b 100644 --- a/internal/web/heartbeat.go +++ b/internal/web/heartbeat.go @@ -7,7 +7,10 @@ import ( "net/http" "strconv" "strings" + "sync" "time" + + "github.com/petaki/satellite/internal/models" ) func (a *app) heartbeat() { @@ -23,71 +26,95 @@ func (a *app) handleProbes() error { return err } + var wg sync.WaitGroup + + wg.Add(len(probes)) + for _, probe := range probes { - sendWebhook := true + go a.handleProbe(probe, &wg) + } - values, start, err := a.probeRepository.FindLatestValues(probe, a.heartbeatWait) - if err != nil { - return err - } + wg.Wait() - for _, value := range values { - if value != nil { - sendWebhook = false + return nil +} - break - } - } +func (a *app) handleProbe(probe models.Probe, wg *sync.WaitGroup) { + defer wg.Done() - if !sendWebhook { - return nil - } + sendWebhook := true - if a.heartbeatSleep > 0 { - hasHeartbeat, err := a.probeRepository.HasHeartbeat(probe) - if err != nil { - return err - } + values, start, err := a.probeRepository.FindLatestValues(probe, a.heartbeatWait) + if err != nil { + a.errorLog.Print(err) - if hasHeartbeat { - return nil - } - } + return + } - a.infoLog.Printf("Calling the heartbeat webhook URL for %s...", probe) + for _, value := range values { + if value != nil { + sendWebhook = false + + break + } + } - body := strings.ReplaceAll(a.heartbeatWebhookBody, "%p", string(probe)) - body = strings.ReplaceAll(body, "%t", start.Format(time.RFC3339)) - body = strings.ReplaceAll(body, "%x", strconv.FormatInt(start.Unix(), 10)) - body = strings.ReplaceAll(body, "%l", fmt.Sprintf("/cpu?probe=%s", probe)) + if !sendWebhook { + return + } - req, err := http.NewRequest(a.heartbeatWebhookMethod, a.heartbeatWebhookURL, bytes.NewBuffer([]byte(body))) + if a.heartbeatSleep > 0 { + hasHeartbeat, err := a.probeRepository.HasHeartbeat(probe) if err != nil { - return err - } + a.errorLog.Print(err) - for key, value := range a.heartbeatWebhookHeader { - req.Header.Set(key, value) + return } - resp, err := a.client.Do(req) - if err != nil { - return err + if hasHeartbeat { + return } + } - defer resp.Body.Close() + a.infoLog.Printf("Calling the heartbeat webhook URL for %s...", probe) - if resp.StatusCode > 400 { - return errors.New("heartbeat: bad status code") - } + body := strings.ReplaceAll(a.heartbeatWebhookBody, "%p", string(probe)) + body = strings.ReplaceAll(body, "%t", start.Format(time.RFC3339)) + body = strings.ReplaceAll(body, "%x", strconv.FormatInt(start.Unix(), 10)) + body = strings.ReplaceAll(body, "%l", fmt.Sprintf("/cpu?probe=%s", probe)) - if a.heartbeatSleep > 0 { - err = a.probeRepository.SetHeartbeat(probe, a.heartbeatSleep) - if err != nil { - return err - } - } + req, err := http.NewRequest(a.heartbeatWebhookMethod, a.heartbeatWebhookURL, bytes.NewBuffer([]byte(body))) + if err != nil { + a.errorLog.Print(err) + + return } - return nil + for key, value := range a.heartbeatWebhookHeader { + req.Header.Set(key, value) + } + + resp, err := a.client.Do(req) + if err != nil { + a.errorLog.Print(err) + + return + } + + defer resp.Body.Close() + + if resp.StatusCode > 400 { + a.errorLog.Print(errors.New("heartbeat: bad status code")) + + return + } + + if a.heartbeatSleep > 0 { + err = a.probeRepository.SetHeartbeat(probe, a.heartbeatSleep) + if err != nil { + a.errorLog.Print(err) + + return + } + } }