Skip to content

Commit

Permalink
[horus] New modularity features (#348)
Browse files Browse the repository at this point in the history
  • Loading branch information
mfordjody committed Sep 16, 2024
1 parent ae85f1f commit e4dd626
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 16 deletions.
40 changes: 24 additions & 16 deletions app/horus/basic/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,22 @@ import (
)

type NodeDataInfo struct {
Id int64 `json:"id"`
NodeName string `json:"node_name" xorm:"node_name"`
NodeIP string `json:"node_ip" xorm:"node_ip"`
Sn string `json:"sn"`
ClusterName string `json:"cluster_name" xorm:"cluster_name"`
ModuleName string `json:"module_name" xorm:"module_name"`
Reason string `json:"reason"`
Restart uint32 `json:"restart"`
Repair uint32 `json:"repair"`
RepairTicketUrl string `json:"repair_ticket_url" xorm:"repair_ticket_url"`
FirstDate string `json:"first_date" xorm:"first_date"`
CreateTime string `json:"create_time" xorm:"create_time created"`
UpdateTime string `json:"update_time" xorm:"update_time updated"`
RecoveryMark int64 `json:"recovery_mark" xorm:"recovery_mark"`
RecoveryQL string `json:"recovery_ql" xorm:"recovery_ql"`
Id int64 `json:"id"`
NodeName string `json:"node_name" xorm:"node_name"`
NodeIP string `json:"node_ip" xorm:"node_ip"`
Sn string `json:"sn"`
ClusterName string `json:"cluster_name" xorm:"cluster_name"`
ModuleName string `json:"module_name" xorm:"module_name"`
Reason string `json:"reason"`
Restart uint32 `json:"restart"`
Repair uint32 `json:"repair"`
RepairTicketUrl string `json:"repair_ticket_url" xorm:"repair_ticket_url"`
FirstDate string `json:"first_date" xorm:"first_date"`
CreateTime string `json:"create_time" xorm:"create_time created"`
UpdateTime string `json:"update_time" xorm:"update_time updated"`
RecoveryMark int64 `json:"recovery_mark" xorm:"recovery_mark"`
RecoveryQL string `json:"recovery_ql" xorm:"recovery_ql"`
CustomizeRecoveryModular map[string]string `xorm:"-"`
}

type PodDataInfo struct {
Expand Down Expand Up @@ -123,7 +124,14 @@ func GetRecoveryNodeDataInfoDate(day int) ([]*NodeDataInfo, error) {
var ndi []*NodeDataInfo
session := db.Where(fmt.Sprintf("recovery_mark = 0 AND first_date > DATE_SUB(CURDATE(), INTERVAL %d DAY)", day))
err := session.Find(&ndi)
return nil, err
return ndi, err
}

func GetDailyLimitNodeDataInfoDate(day, module, cluster string) ([]*NodeDataInfo, error) {
var ndi []*NodeDataInfo
session := db.Where(fmt.Sprintf("DATE(first_date)='%s' AND module_name='%s' AND cluster_name='%s", day, module, cluster))
err := session.Find(&ndi)
return ndi, err
}

func (n *NodeDataInfo) RecoveryMarker() (bool, error) {
Expand Down
8 changes: 8 additions & 0 deletions app/horus/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,14 @@ func main() {
}
return nil
})
group.Add(func() error {
klog.Info("horus customize modular manager start success.")
err := horus.CustomizeModularManager(ctx)
if err != nil {
klog.Errorf("horus customize modular manager start failed error:%v", err)
}
return nil
})
group.Wait()
}

Expand Down
136 changes: 136 additions & 0 deletions app/horus/core/horuser/modular.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package horuser

import (
"context"
"fmt"
"github.com/apache/dubbo-kubernetes/app/horus/basic/db"
"github.com/apache/dubbo-kubernetes/app/horus/core/alert"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"sync"
"time"
)

func (h *Horuser) CustomizeModularManager(ctx context.Context) error {
go wait.UntilWithContext(ctx, h.CustomizeModular, time.Duration(h.cc.CustomModular.CheckIntervalSecond)*time.Second)
<-ctx.Done()
return nil
}

func (h *Horuser) CustomizeModular(ctx context.Context) {
var wg sync.WaitGroup
for clusterName, addr := range h.cc.PromMultiple {
if _, exists := h.cc.CustomModular.KubeMultiple[clusterName]; !exists {
klog.Infof("CustomizeModular config disable clusterName: %v", clusterName)
continue
}
wg.Add(1)
go func(clusterName, addr string) {
defer wg.Done()
}(clusterName, addr)
}
wg.Wait()
}

func (h *Horuser) CustomizeModularOnCluster(clusterName, addr string) {
klog.Infof("CustomizeModularOnCluster Start clusterName:%v", clusterName)
for moduleName, ql := range h.cc.CustomModular.CheckQL {
vecs, err := h.InstantQuery(addr, ql, clusterName, h.cc.CustomModular.PromQueryTimeSecond)
if err != nil {
klog.Errorf("CustomizeModularOnCluster InstantQuery err:%v", err)
klog.Infof("clusterName:%vec ql: %v", clusterName, ql)
return
}
count := len(vecs)
for index, vec := range vecs {
labelMap := vec.Metric
nodeName := string(labelMap["node"])
if nodeName == "" {
klog.Warningf("CustomizeModularOnCluster empty.")
klog.Infof("clusterName:%v moduleName:%v index:%d", clusterName, moduleName, index)
continue
}
ip := string(labelMap["instance"])
value := vec.Value.String()
klog.Infof("RunCommonModuleOnCluster.QueryRes.print[clusterName:%v][moduleName:%v][%d][nodeName:%v][value:%v][count:%v]", clusterName, moduleName, index+1, nodeName, value, count)
h.CustomizeModularNodes(clusterName, moduleName, nodeName, ip)
}
}
}

func (h *Horuser) CustomizeModularNodes(clusterName, moduleName, nodeName, ip string) {
today := time.Now().Format("2006-01-02")

recoveryQL := h.cc.CustomModular.RecoveryQL[moduleName]
dailyLimit := h.cc.CustomModular.CordonDailyLimit[moduleName]

data, err := db.GetDailyLimitNodeDataInfoDate(today, moduleName, clusterName)
if err != nil {
klog.Errorf("CustomizeModularNodes GetDailyLimitNodeDataInfoDate err:%v", err)
return
}
klog.Infof("%v", data)
if len(data) > dailyLimit {
msg := fmt.Sprintf("【日期:%v】 【集群:%v\n】 【模块今日 Cordon 节点数: %v】\n 【已达到今日上限: %v】\n [节点:%v]",
data, clusterName, moduleName, dailyLimit, nodeName)
klog.Infof(msg)
klog.Infof("Attempting to send DingTalk message (limit exceeded): %s", msg)
alert.DingTalkSend(h.cc.CustomModular.DingTalk, msg)
klog.Infof("DingTalk message sent (limit exceeded)")
return
}

write := db.NodeDataInfo{
NodeName: nodeName,
NodeIP: ip,
ClusterName: clusterName,
ModuleName: moduleName,
Reason: moduleName,
FirstDate: today,
RecoveryQL: recoveryQL,
}
pass, _ := write.Check()
if pass {
klog.Infof("CustomizeModularNodes already existing clusterName:%v nodeName:%v moduleName:%v", clusterName, nodeName, moduleName)
return
}
err = h.Cordon(nodeName, clusterName)
res := "success"
if err != nil {
res = fmt.Sprintf("failed:%v", err)
klog.Errorf("Cordon failed:%v", err)
}
if err != nil {
res = fmt.Sprintf("failed:%v", err)
}

msg := fmt.Sprintf("【集群:%v】\n 【%s 插件 Cordon 节点:%v】\n 【结果: %v】\n 【今日操作次数:%v】",
clusterName, moduleName, nodeName, res, len(today)+1)
klog.Infof(msg)

klog.Infof("Attempting to send DingTalk message: %s", msg)
alert.DingTalkSend(h.cc.CustomModular.DingTalk, msg)
klog.Infof("DingTalk message sent")

_, err = write.AddOrGet()
if err != nil {
klog.Errorf("CustomizeModularNodes AddOrGet err:%v", err)
klog.Infof("moduleName:%v nodeName:%v", moduleName, nodeName)
}
klog.Infof("CustomizeModularNodes AddOrGet success moduleName:%v nodeName:%v", moduleName, nodeName)
}

0 comments on commit e4dd626

Please sign in to comment.