Skip to content

Commit

Permalink
refactor: webhook send event (ccfos#2248)
Browse files Browse the repository at this point in the history
Co-authored-by: Xu Bin <[email protected]>
  • Loading branch information
710leo and Reditiny authored Oct 28, 2024
1 parent 5213b1d commit 102549c
Show file tree
Hide file tree
Showing 7 changed files with 315 additions and 31 deletions.
49 changes: 29 additions & 20 deletions alert/eval/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package eval
import (
"context"
"encoding/json"
"errors"
"fmt"
"math"
"reflect"
Expand Down Expand Up @@ -117,21 +118,29 @@ func (arw *AlertRuleWorker) Eval() {
arw.processor.Stats.CounterRuleEval.WithLabelValues().Inc()

typ := cachedRule.GetRuleType()
var anomalyPoints []common.AnomalyPoint
var recoverPoints []common.AnomalyPoint
var (
anomalyPoints []common.AnomalyPoint
recoverPoints []common.AnomalyPoint
err error
)
switch typ {
case models.PROMETHEUS:
anomalyPoints = arw.GetPromAnomalyPoint(cachedRule.RuleConfig)
anomalyPoints, err = arw.GetPromAnomalyPoint(cachedRule.RuleConfig)
case models.HOST:
anomalyPoints = arw.GetHostAnomalyPoint(cachedRule.RuleConfig)
anomalyPoints, err = arw.GetHostAnomalyPoint(cachedRule.RuleConfig)
case models.TDENGINE:
anomalyPoints, recoverPoints = arw.GetTdengineAnomalyPoint(cachedRule, arw.processor.DatasourceId())
anomalyPoints, recoverPoints, err = arw.GetTdengineAnomalyPoint(cachedRule, arw.processor.DatasourceId())
case models.LOKI:
anomalyPoints = arw.GetPromAnomalyPoint(cachedRule.RuleConfig)
anomalyPoints, err = arw.GetPromAnomalyPoint(cachedRule.RuleConfig)
default:
return
}

if err != nil {
logger.Errorf("rule_eval:%s get anomaly point err:%s", arw.Key(), err.Error())
return
}

if arw.processor == nil {
logger.Warningf("rule_eval:%s processor is nil", arw.Key())
return
Expand Down Expand Up @@ -179,21 +188,21 @@ func (arw *AlertRuleWorker) Stop() {
close(arw.quit)
}

func (arw *AlertRuleWorker) GetPromAnomalyPoint(ruleConfig string) []common.AnomalyPoint {
func (arw *AlertRuleWorker) GetPromAnomalyPoint(ruleConfig string) ([]common.AnomalyPoint, error) {
var lst []common.AnomalyPoint
var severity int

var rule *models.PromRuleConfig
if err := json.Unmarshal([]byte(ruleConfig), &rule); err != nil {
logger.Errorf("rule_eval:%s rule_config:%s, error:%v", arw.Key(), ruleConfig, err)
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId()), GET_RULE_CONFIG).Inc()
return lst
return lst, err
}

if rule == nil {
logger.Errorf("rule_eval:%s rule_config:%s, error:rule is nil", arw.Key(), ruleConfig)
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId()), GET_RULE_CONFIG).Inc()
return lst
return lst, errors.New("rule is nil")
}

arw.inhibit = rule.Inhibit
Expand Down Expand Up @@ -224,7 +233,7 @@ func (arw *AlertRuleWorker) GetPromAnomalyPoint(ruleConfig string) []common.Anom
logger.Errorf("rule_eval:%s promql:%s, error:%v", arw.Key(), promql, err)
arw.processor.Stats.CounterQueryDataErrorTotal.WithLabelValues(fmt.Sprintf("%d", arw.datasourceId)).Inc()
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId()), QUERY_DATA).Inc()
continue
return lst, err
}

if len(warnings) > 0 {
Expand All @@ -241,18 +250,18 @@ func (arw *AlertRuleWorker) GetPromAnomalyPoint(ruleConfig string) []common.Anom
}
lst = append(lst, points...)
}
return lst
return lst, nil
}

func (arw *AlertRuleWorker) GetTdengineAnomalyPoint(rule *models.AlertRule, dsId int64) ([]common.AnomalyPoint, []common.AnomalyPoint) {
func (arw *AlertRuleWorker) GetTdengineAnomalyPoint(rule *models.AlertRule, dsId int64) ([]common.AnomalyPoint, []common.AnomalyPoint, error) {
// 获取查询和规则判断条件
points := []common.AnomalyPoint{}
recoverPoints := []common.AnomalyPoint{}
ruleConfig := strings.TrimSpace(rule.RuleConfig)
if ruleConfig == "" {
logger.Warningf("rule_eval:%d promql is blank", rule.Id)
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId()), GET_RULE_CONFIG).Inc()
return points, recoverPoints
return points, recoverPoints, errors.New("rule config is nil")
}

var ruleQuery models.RuleQuery
Expand All @@ -261,7 +270,7 @@ func (arw *AlertRuleWorker) GetTdengineAnomalyPoint(rule *models.AlertRule, dsId
logger.Warningf("rule_eval:%d promql parse error:%s", rule.Id, err.Error())
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId())).Inc()
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId()), GET_RULE_CONFIG).Inc()
return points, recoverPoints
return points, recoverPoints, err
}

arw.inhibit = ruleQuery.Inhibit
Expand All @@ -288,7 +297,7 @@ func (arw *AlertRuleWorker) GetTdengineAnomalyPoint(rule *models.AlertRule, dsId
logger.Warningf("rule_eval rid:%d query data error: %v", rule.Id, err)
arw.processor.Stats.CounterQueryDataErrorTotal.WithLabelValues(fmt.Sprintf("%d", arw.datasourceId)).Inc()
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId()), QUERY_DATA).Inc()
continue
return points, recoverPoints, err
}
// 此条日志很重要,是告警判断的现场值
logger.Debugf("rule_eval rid:%d req:%+v resp:%+v", rule.Id, query, series)
Expand All @@ -305,24 +314,24 @@ func (arw *AlertRuleWorker) GetTdengineAnomalyPoint(rule *models.AlertRule, dsId
points, recoverPoints = GetAnomalyPoint(rule.Id, ruleQuery, seriesTagIndexes, seriesStore)
}

return points, recoverPoints
return points, recoverPoints, nil
}

func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) []common.AnomalyPoint {
func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) ([]common.AnomalyPoint, error) {
var lst []common.AnomalyPoint
var severity int

var rule *models.HostRuleConfig
if err := json.Unmarshal([]byte(ruleConfig), &rule); err != nil {
logger.Errorf("rule_eval:%s rule_config:%s, error:%v", arw.Key(), ruleConfig, err)
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId()), GET_RULE_CONFIG).Inc()
return lst
return lst, err
}

if rule == nil {
logger.Errorf("rule_eval:%s rule_config:%s, error:rule is nil", arw.Key(), ruleConfig)
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId()), GET_RULE_CONFIG).Inc()
return lst
return lst, errors.New("rule is nil")
}

arw.inhibit = rule.Inhibit
Expand Down Expand Up @@ -449,7 +458,7 @@ func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) []common.Anom
}
}
}
return lst
return lst, nil
}

func GetAnomalyPoint(ruleId int64, ruleQuery models.RuleQuery, seriesTagIndexes map[string]map[uint64][]uint64, seriesStore map[uint64]models.DataResp) ([]common.AnomalyPoint, []common.AnomalyPoint) {
Expand Down
8 changes: 4 additions & 4 deletions alert/sender/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,8 @@ func PushCallbackEvent(ctx *ctx.Context, webhook *models.Webhook, event *models.

if queue == nil {
queue = &WebhookQueue{
list: NewSafeListLimited(QueueMaxSize),
closeCh: make(chan struct{}),
eventQueue: NewSafeEventQueue(QueueMaxSize),
closeCh: make(chan struct{}),
}

CallbackEventQueueLock.Lock()
Expand All @@ -197,8 +197,8 @@ func PushCallbackEvent(ctx *ctx.Context, webhook *models.Webhook, event *models.
StartConsumer(ctx, queue, webhook.Batch, webhook, stats)
}

succ := queue.list.PushFront(event)
succ := queue.eventQueue.Push(event)
if !succ {
logger.Warningf("Write channel(%s) full, current channel size: %d event:%v", webhook.Url, queue.list.Len(), event)
logger.Warningf("Write channel(%s) full, current channel size: %d event:%v", webhook.Url, queue.eventQueue.Len(), event)
}
}
14 changes: 7 additions & 7 deletions alert/sender/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ var EventQueueLock sync.RWMutex
const QueueMaxSize = 100000

type WebhookQueue struct {
list *SafeListLimited
closeCh chan struct{}
eventQueue *SafeEventQueue
closeCh chan struct{}
}

func PushEvent(ctx *ctx.Context, webhook *models.Webhook, event *models.AlertCurEvent, stats *astats.Stats) {
Expand All @@ -132,8 +132,8 @@ func PushEvent(ctx *ctx.Context, webhook *models.Webhook, event *models.AlertCur

if queue == nil {
queue = &WebhookQueue{
list: NewSafeListLimited(QueueMaxSize),
closeCh: make(chan struct{}),
eventQueue: NewSafeEventQueue(QueueMaxSize),
closeCh: make(chan struct{}),
}

EventQueueLock.Lock()
Expand All @@ -143,10 +143,10 @@ func PushEvent(ctx *ctx.Context, webhook *models.Webhook, event *models.AlertCur
StartConsumer(ctx, queue, webhook.Batch, webhook, stats)
}

succ := queue.list.PushFront(event)
succ := queue.eventQueue.Push(event)
if !succ {
stats.AlertNotifyErrorTotal.WithLabelValues("push_event_queue").Inc()
logger.Warningf("Write channel(%s) full, current channel size: %d event:%v", webhook.Url, queue.list.Len(), event)
logger.Warningf("Write channel(%s) full, current channel size: %d event:%v", webhook.Url, queue.eventQueue.Len(), event)
}
}

Expand All @@ -157,7 +157,7 @@ func StartConsumer(ctx *ctx.Context, queue *WebhookQueue, popSize int, webhook *
logger.Infof("event queue:%v closed", queue)
return
default:
events := queue.list.PopBack(popSize)
events := queue.eventQueue.PopN(popSize)
if len(events) == 0 {
time.Sleep(time.Millisecond * 400)
continue
Expand Down
113 changes: 113 additions & 0 deletions alert/sender/webhook_event_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package sender

import (
"container/list"
"sync"

"github.com/ccfos/nightingale/v6/models"
)

type SafeEventQueue struct {
lock sync.RWMutex
maxSize int
queueHigh *list.List
queueMiddle *list.List
queueLow *list.List
}

const (
High = 1
Middle = 2
Low = 3
)

func NewSafeEventQueue(maxSize int) *SafeEventQueue {
return &SafeEventQueue{
maxSize: maxSize,
lock: sync.RWMutex{},
queueHigh: list.New(),
queueMiddle: list.New(),
queueLow: list.New(),
}
}

func (spq *SafeEventQueue) Len() int {
spq.lock.RLock()
defer spq.lock.RUnlock()
return spq.queueHigh.Len() + spq.queueMiddle.Len() + spq.queueLow.Len()
}

// len 无锁读取长度,不要在本文件外调用
func (spq *SafeEventQueue) len() int {
return spq.queueHigh.Len() + spq.queueMiddle.Len() + spq.queueLow.Len()
}

func (spq *SafeEventQueue) Push(event *models.AlertCurEvent) bool {
spq.lock.Lock()
defer spq.lock.Unlock()
switch event.Severity {
case High:
spq.queueHigh.PushBack(event)
case Middle:
spq.queueMiddle.PushBack(event)
case Low:
spq.queueLow.PushBack(event)
default:
return false
}

for spq.len() > spq.maxSize {
if spq.queueLow.Len() > 0 {
spq.queueLow.Remove(spq.queueLow.Front())
} else if spq.queueMiddle.Len() > 0 {
spq.queueMiddle.Remove(spq.queueMiddle.Front())
} else {
spq.queueHigh.Remove(spq.queueHigh.Front())
}
}
return true
}

// pop 无锁弹出事件,不要在本文件外调用
func (spq *SafeEventQueue) pop() *models.AlertCurEvent {
if spq.len() == 0 {
return nil
}

var elem interface{}

if spq.queueHigh.Len() > 0 {
elem = spq.queueHigh.Remove(spq.queueHigh.Front())
} else if spq.queueMiddle.Len() > 0 {
elem = spq.queueMiddle.Remove(spq.queueMiddle.Front())
} else {
elem = spq.queueLow.Remove(spq.queueLow.Front())
}
event, ok := elem.(*models.AlertCurEvent)
if !ok {
return nil
}
return event
}

func (spq *SafeEventQueue) Pop() *models.AlertCurEvent {
spq.lock.Lock()
defer spq.lock.Unlock()
return spq.pop()
}

func (spq *SafeEventQueue) PopN(n int) []*models.AlertCurEvent {
spq.lock.Lock()
defer spq.lock.Unlock()

events := make([]*models.AlertCurEvent, 0, n)
count := 0
for count < n && spq.len() > 0 {
event := spq.pop()
if event != nil {
events = append(events, event)
}
count++
}
return events
}
Loading

0 comments on commit 102549c

Please sign in to comment.