Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
petaki committed Oct 15, 2022
2 parents 621deea + 008cf53 commit 74160a7
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 82 deletions.
31 changes: 24 additions & 7 deletions internal/models/probe_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (rpr *RedisProbeRepository) FindLatestValues(probe Probe, limit int) ([]int
conn := rpr.RedisPool.Get()
defer conn.Close()

var fields []string
days := map[string][]string{}

now := time.Now()
end := time.Date(
Expand All @@ -89,14 +89,31 @@ func (rpr *RedisProbeRepository) FindLatestValues(probe Probe, limit int) ([]int
start := end.Add(-time.Duration(limit-1) * time.Minute)

for current := start; !current.After(end); current = current.Add(time.Minute) {
fields = append(fields, strconv.FormatInt(current.Unix(), 10))
day := strconv.FormatInt(time.Date(
current.Year(),
current.Month(),
current.Day(),
0,
0,
0,
0,
now.Location(),
).Unix(), 10)

days[day] = append(days[day], strconv.FormatInt(current.Unix(), 10))
}

values, err := redis.Values(
conn.Do("HMGET", redis.Args{}.Add(string(probe)+":"+seriesCPUKeyPrefix+strconv.FormatInt(today().Unix(), 10)).AddFlat(fields)...),
)
if err != nil {
return nil, nil, err
var values []interface{}

for day, fields := range days {
dayValues, err := redis.Values(
conn.Do("HMGET", redis.Args{}.Add(string(probe)+":"+seriesCPUKeyPrefix+day).AddFlat(fields)...),
)
if err != nil {
return nil, nil, err
}

values = append(values, dayValues...)
}

return values, &start, nil
Expand Down
65 changes: 37 additions & 28 deletions internal/models/series_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,47 +41,56 @@ func (rsr *RedisSeriesRepository) FindDiskPaths(probe Probe) ([]string, error) {
conn := rsr.RedisPool.Get()
defer conn.Close()

cursor := 0
prefix := string(probe) + ":" + seriesDiskKeyPrefix + strconv.FormatInt(today().Unix(), 10) + ":"

var paths []string
timestamps := rsr.timestamps(Month)

for {
values, err := redis.Values(
conn.Do("SCAN", cursor, "MATCH", prefix+"*"),
)
if err != nil {
return nil, err
}
for i := len(timestamps) - 1; i >= 0; i-- {
cursor := 0
prefix := string(probe) + ":" + seriesDiskKeyPrefix + strconv.FormatInt(timestamps[i], 10) + ":"

cursor, err = redis.Int(values[0], nil)
if err != nil {
return nil, err
for {
values, err := redis.Values(
conn.Do("SCAN", cursor, "MATCH", prefix+"*"),
)
if err != nil {
return nil, err
}

cursor, err = redis.Int(values[0], nil)
if err != nil {
return nil, err
}

current, err := redis.Strings(values[1], nil)
if err != nil {
return nil, err
}

paths = append(paths, current...)

if cursor == 0 {
break
}
}

current, err := redis.Strings(values[1], nil)
if err != nil {
return nil, err
if len(paths) == 0 {
continue
}

paths = append(paths, current...)
for key, value := range paths {
path, err := base64.StdEncoding.DecodeString(strings.ReplaceAll(value, prefix, ""))
if err != nil {
return nil, err
}

if cursor == 0 {
break
paths[key] = string(path)
}
}

for key, value := range paths {
path, err := base64.StdEncoding.DecodeString(strings.ReplaceAll(value, prefix, ""))
if err != nil {
return nil, err
}
sort.Strings(paths)

paths[key] = string(path)
break
}

sort.Strings(paths)

return paths, nil
}

Expand Down
121 changes: 74 additions & 47 deletions internal/web/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ import (
"net/http"
"strconv"
"strings"
"sync"
"time"

"github.com/petaki/satellite/internal/models"
)

func (a *app) heartbeat() {
Expand All @@ -23,71 +26,95 @@ func (a *app) handleProbes() error {
return err
}

var wg sync.WaitGroup

wg.Add(len(probes))

for _, probe := range probes {
sendWebhook := true
go a.handleProbe(probe, &wg)
}

values, start, err := a.probeRepository.FindLatestValues(probe, a.heartbeatWait)
if err != nil {
return err
}
wg.Wait()

for _, value := range values {
if value != nil {
sendWebhook = false
return nil
}

break
}
}
func (a *app) handleProbe(probe models.Probe, wg *sync.WaitGroup) {
defer wg.Done()

if !sendWebhook {
return nil
}
sendWebhook := true

if a.heartbeatSleep > 0 {
hasHeartbeat, err := a.probeRepository.HasHeartbeat(probe)
if err != nil {
return err
}
values, start, err := a.probeRepository.FindLatestValues(probe, a.heartbeatWait)
if err != nil {
a.errorLog.Print(err)

if hasHeartbeat {
return nil
}
}
return
}

a.infoLog.Printf("Calling the heartbeat webhook URL for %s...", probe)
for _, value := range values {
if value != nil {
sendWebhook = false

break
}
}

body := strings.ReplaceAll(a.heartbeatWebhookBody, "%p", string(probe))
body = strings.ReplaceAll(body, "%t", start.Format(time.RFC3339))
body = strings.ReplaceAll(body, "%x", strconv.FormatInt(start.Unix(), 10))
body = strings.ReplaceAll(body, "%l", fmt.Sprintf("/cpu?probe=%s", probe))
if !sendWebhook {
return
}

req, err := http.NewRequest(a.heartbeatWebhookMethod, a.heartbeatWebhookURL, bytes.NewBuffer([]byte(body)))
if a.heartbeatSleep > 0 {
hasHeartbeat, err := a.probeRepository.HasHeartbeat(probe)
if err != nil {
return err
}
a.errorLog.Print(err)

for key, value := range a.heartbeatWebhookHeader {
req.Header.Set(key, value)
return
}

resp, err := a.client.Do(req)
if err != nil {
return err
if hasHeartbeat {
return
}
}

defer resp.Body.Close()
a.infoLog.Printf("Calling the heartbeat webhook URL for %s...", probe)

if resp.StatusCode > 400 {
return errors.New("heartbeat: bad status code")
}
body := strings.ReplaceAll(a.heartbeatWebhookBody, "%p", string(probe))
body = strings.ReplaceAll(body, "%t", start.Format(time.RFC3339))
body = strings.ReplaceAll(body, "%x", strconv.FormatInt(start.Unix(), 10))
body = strings.ReplaceAll(body, "%l", fmt.Sprintf("/cpu?probe=%s", probe))

if a.heartbeatSleep > 0 {
err = a.probeRepository.SetHeartbeat(probe, a.heartbeatSleep)
if err != nil {
return err
}
}
req, err := http.NewRequest(a.heartbeatWebhookMethod, a.heartbeatWebhookURL, bytes.NewBuffer([]byte(body)))
if err != nil {
a.errorLog.Print(err)

return
}

return nil
for key, value := range a.heartbeatWebhookHeader {
req.Header.Set(key, value)
}

resp, err := a.client.Do(req)
if err != nil {
a.errorLog.Print(err)

return
}

defer resp.Body.Close()

if resp.StatusCode > 400 {
a.errorLog.Print(errors.New("heartbeat: bad status code"))

return
}

if a.heartbeatSleep > 0 {
err = a.probeRepository.SetHeartbeat(probe, a.heartbeatSleep)
if err != nil {
a.errorLog.Print(err)

return
}
}
}

0 comments on commit 74160a7

Please sign in to comment.