Skip to content

Commit

Permalink
Migrated notifications from webhook to power automate (#372)
Browse files Browse the repository at this point in the history
  • Loading branch information
jothilal22 authored Feb 18, 2025
1 parent 6c7874a commit 3c94828
Show file tree
Hide file tree
Showing 2 changed files with 295 additions and 184 deletions.
269 changes: 182 additions & 87 deletions alertmanager/teams/teams.go
Original file line number Diff line number Diff line change
@@ -1,119 +1,214 @@
package teams

import (
"bytes"
"encoding/json"
"fmt"
"io"

"net/http"

"github.com/abahmed/kwatch/config"
"github.com/abahmed/kwatch/event"
"github.com/sirupsen/logrus"
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"time"

"github.com/abahmed/kwatch/config"
"github.com/abahmed/kwatch/event"
"github.com/sirupsen/logrus"
)

const (
defaultTeamsTitle = "⛑ Kwatch detected a crash in pod"
defaultTeamsTitle = "⛑ Kwatch detected a crash in pod"
)

type Teams struct {
webhook string
title string
text string
// The HTTP trigger URL for the Power Automate flow
flowURL string
title string
text string

// reference for general app configuration
appCfg *config.App
// reference for general app configuration
appCfg *config.App
}

type teamsWebhookPayload struct {
Title string `json:"title"`
Text string `json:"text"`
type teamsFlowPayload struct {
Title string `json:"title"`
Text string `json:"text"`
Attachment []map[string]interface{} `json:"attachment"`
}

// NewTeams returns new team instance
func NewTeams(config map[string]interface{}, appCfg *config.App) *Teams {
webhook, ok := config["webhook"].(string)
if !ok || len(webhook) == 0 {
logrus.Warnf("initializing Teams with empty webhook url")
return nil
}

logrus.Infof("initializing Teams with webhook url: %s", webhook)

title, _ := config["title"].(string)
text, _ := config["text"].(string)

return &Teams{
webhook: webhook,
title: title,
text: text,
appCfg: appCfg,
}
flowURL, ok := config["flowURL"].(string)
if !ok || len(flowURL) == 0 {
logrus.Warnf("initializing Teams with empty flow url")
return nil
}

logrus.Infof("initializing Teams with flow url: %s", flowURL)

title, _ := config["title"].(string)
text, _ := config["text"].(string)

return &Teams{
flowURL: flowURL,
title: title,
text: text,
appCfg: appCfg,
}
}

// Name returns name of the provider
func (t *Teams) Name() string {
return "Microsoft Teams"
return "Microsoft Teams"
}

// SendEvent sends event to the provider
// SendEvent sends event to the Power Automate flow
func (t *Teams) SendEvent(e *event.Event) error {
return t.sendAPI(t.buildRequestBodyTeams(e))
payload := t.buildRequestBodyTeams(e)
return t.sendAPI(payload)
}

// SendMessage sends text message to the provider
// SendMessage sends plain text message to the Power Automate flow
func (t *Teams) SendMessage(msg string) error {

msgPayload := &teamsWebhookPayload{
Text: msg,
}

jsonBytes, _ := json.Marshal(msgPayload)
return t.sendAPI(jsonBytes)
payload := t.buildRequestBodyMessage(msg)
return t.sendAPI(payload)
}

func (t *Teams) sendAPI(b []byte) error {
buffer := bytes.NewBuffer(b)
request, err := http.NewRequest(http.MethodPost, t.webhook, buffer)
if err != nil {
return err
}

request.Header.Set("Content-Type", "application/json")

client := &http.Client{}
response, err := client.Do(request)
if err != nil {
return err
}
defer response.Body.Close()

if response.StatusCode != 200 {
body, _ := io.ReadAll(response.Body)
return fmt.Errorf(
"call to teams alert returned status code %d: %s",
response.StatusCode,
string(body))
}

return nil
// SendApi send the given payload to the Power Automate flow with retry logic
func (t *Teams) sendAPI(payload []byte) error {
// Number of retry attempts
maxRetries := 3
retryDelay := 5 * time.Second

// try to send the message up to "maxRetries" times
for attempts := 0; attempts < maxRetries; attempts++ {
request, err := http.NewRequest(http.MethodPost, t.flowURL, bytes.NewBuffer(payload))
if err != nil {
return fmt.Errorf("error creating HTTP request: %v", err)
}

request.Header.Set("Content-Type", "application/json")

client := &http.Client{}
resp, err := client.Do(request)
if err != nil {
return fmt.Errorf("failed to create HTTP response: %w", err)
}
defer resp.Body.Close()

// Check for success (HTTP 200 OK)
if resp.StatusCode == http.StatusOK {
return nil
}

// Handle specific 400 errors (TriggerInputSchemaMismatch)
if resp.StatusCode == http.StatusBadRequest {
body, _ := io.ReadAll(resp.Body)
// Check for error (TriggerInputSchemaMismatch)
if strings.Contains(string(body), "TriggerInputSchemaMismatch") {
return fmt.Errorf("failed to send message due to schema mismatch: %s", string(body))
}
return fmt.Errorf("call to power automate flow returned status %d: %s", resp.StatusCode, string(body))
}

// Handle 202 status and retry
if resp.StatusCode == http.StatusAccepted {
logrus.Warnf("Request accepted by Power Automate flow, but not processed immediately. Attempt %d of %d.", attempts+1, maxRetries)
} else {
// For other non-200 status codes, log the error
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("call to power automate flow returned status %d: %s", resp.StatusCode, string(body))
}

// Wait for a delay before retrying
if attempts < maxRetries-1 {
time.Sleep(retryDelay)
}
}

// After all retries, return an error
return fmt.Errorf("failed to send message after %d attempts", maxRetries)
}

// buildRequestBodyTeams builds formatted string from event
// buildRequestBodyTeams builds the request body for the Power Automate flow
func (t *Teams) buildRequestBodyTeams(e *event.Event) []byte {
// use custom title if it's provided, otherwise use default
title := t.title
if len(title) == 0 {
title = defaultTeamsTitle
}

msg := e.FormatMarkdown(t.appCfg.ClusterName, t.text, "\n\n")
msgPayload := &teamsWebhookPayload{
Title: title,
Text: msg,
}

jsonBytes, _ := json.Marshal(msgPayload)
return (jsonBytes)
// Use custom title if it's provided, otherwise use the default title
title := t.title
if len(title) == 0 {
title = defaultTeamsTitle
}

// Format the message with markdown
msg := e.FormatMarkdown(t.appCfg.ClusterName, t.text, "\n\n")

// Create the attachment for the message with full event details
attachments := []map[string]interface{}{
{
"contentType": "application/vnd.microsoft.card.adaptive",
"content": map[string]interface{}{
"$schema": "http://adaptivecards.io/schemas/adaptive-card.json",
"type": "AdaptiveCard",
"version": "1.2",
"body": []map[string]interface{}{
{
"type": "TextBlock",
"text": title,
},
{
"type": "TextBlock",
"text": fmt.Sprintf("Pod Name: %s", e.PodName),
},
{
"type": "TextBlock",
"text": fmt.Sprintf("Namespace: %s", e.Namespace),
},
{
"type": "TextBlock",
"text": fmt.Sprintf("Reason: %s", e.Reason),
},
{
"type": "TextBlock",
"text": fmt.Sprintf("Logs: %s", e.Logs),
},
{
"type": "TextBlock",
"text": fmt.Sprintf("Events: \n%s", e.Events),
},
{
"type": "TextBlock",
"text": fmt.Sprintf("Time: %s", time.Now().Format(time.RFC1123)),
},
},
},
},
}

// Prepare the payload for the Power Automate flow
payload := &teamsFlowPayload{
Title: title,
Text: msg,
Attachment: attachments, // Attachment should be an array
}

jsonBytes, err := json.Marshal(payload)
if err != nil {
logrus.Errorf("failed to marshal payload: %v", err)
return nil
}
return jsonBytes
}

// buildRequestBodyMessage builds plain message payload for the Power Automate flow
func (t *Teams) buildRequestBodyMessage(msg string) []byte {
payload := &teamsFlowPayload{
Title: "New Alert",
Text: msg,
// Empty attachments array to prevent schema mismatch error
Attachment: []map[string]interface{}{},
}

jsonBytes, err := json.Marshal(payload)
if err != nil {
logrus.Errorf("failed to marshal payload: %v", err)
return nil
}
return jsonBytes
}
Loading

0 comments on commit 3c94828

Please sign in to comment.