Skip to content

Commit

Permalink
Update db backup (#4976)
Browse files Browse the repository at this point in the history
* update kafka monitor

* if backup exceed 1 hour,send notification
  • Loading branch information
wallyxjh authored Aug 21, 2024
1 parent ae37ede commit 0549a9a
Showing 1 changed file with 22 additions and 7 deletions.
29 changes: 22 additions & 7 deletions service/exceptionmonitor/helper/monitor/database_backup_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,22 +46,34 @@ func checkDatabaseBackups() error {

func processBackup(backup unstructured.Unstructured) {
status, found, err := unstructured.NestedString(backup.Object, "status", "phase")
backupName, namespace, startTime := backup.GetName(), backup.GetNamespace(), backup.GetCreationTimestamp().String()
backupName, namespace, startTimestamp := backup.GetName(), backup.GetNamespace(), backup.GetCreationTimestamp().String()
if err != nil {
log.Printf("Unable to get %s status in ns %s:%v", backupName, namespace, err)
log.Printf("Unable to get %s status in %s:%v", backupName, namespace, err)
return
}
if !found || status != "Failed" {
if !found || (status != "Failed" && status != "InProgress") {
return
}
if status == "InProgress" {
startTime, err := time.Parse(time.RFC3339, startTimestamp)
if err != nil {
log.Printf("%s Unable to parsing time in %s:%v", backupName, namespace, err)
fmt.Println("Error parsing time:", err)
return
}
currentTime := time.Now().UTC()
duration := currentTime.Sub(startTime)
if duration >= time.Hour {
SendBackupNotification(backupName, namespace, status, startTimestamp)
}
return
}
fmt.Println(backupName, namespace)
debt, _, _ := checkDebt(namespace)
if !debt {
return
}
backupPolicyName, _, _ := unstructured.NestedString(backup.Object, "spec", "backupPolicyName")
databaseName := getPrefix(backupPolicyName)
fmt.Println(databaseName)
cluster, err := api.DynamicClient.Resource(databaseClusterGVR).Namespace(namespace).Get(context.Background(), databaseName, metav1.GetOptions{})
if cluster != nil && errors.IsNotFound(err) {
return
Expand All @@ -70,7 +82,10 @@ func processBackup(backup unstructured.Unstructured) {
if dbStatus == "Stopped" {
return
}
fmt.Println(dbStatus)
SendBackupNotification(backupName, namespace, status, startTimestamp)
}

func SendBackupNotification(backupName, namespace, status, startTimestamp string) {
notificationInfo := notification.Info{
DatabaseClusterName: backupName,
Namespace: namespace,
Expand All @@ -80,7 +95,7 @@ func processBackup(backup unstructured.Unstructured) {
NotificationType: "exception",
}
if _, ok := api.LastBackupStatusMap[backupName]; !ok {
message := notification.GetBackupMessage("exception", namespace, backupName, status, startTime, "")
message := notification.GetBackupMessage("exception", namespace, backupName, status, startTimestamp, "")
if err := notification.SendFeishuNotification(notificationInfo, message, api.FeishuWebhookURLMap["FeishuWebhookURLBackup"]); err != nil {
log.Printf("Error sending exception notification:%v", err)
}
Expand Down

0 comments on commit 0549a9a

Please sign in to comment.