Skip to content

Commit

Permalink
refactor: Add retry mechanism to Promster with backoff utility
Browse files Browse the repository at this point in the history
  • Loading branch information
pcfreak30 committed Dec 28, 2024
1 parent c8941a1 commit 5f3042b
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 94 deletions.
220 changes: 126 additions & 94 deletions cmd/promster/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/urfave/cli/v3"
"gopkg.in/yaml.v2"
etcdregistry "go.lumeweb.com/etcd-registry"
"go.lumeweb.com/promster/pkg/util"
)

//go:embed prometheus.yml.tmpl
Expand Down Expand Up @@ -116,96 +117,115 @@ func writeConfigFile(filename string, data []byte) error {
func reloadPrometheus() error {
adminUsername := os.Getenv("PROMETHEUS_ADMIN_USERNAME")
adminPassword := os.Getenv("PROMETHEUS_ADMIN_PASSWORD")
if adminUsername != "" && adminPassword != "" {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, "POST", "http://localhost:9090/-/reload", nil)
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
req.SetBasicAuth(adminUsername, adminPassword)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("failed to reload prometheus config: %w", err)
}
defer func(Body io.ReadCloser) {
err := Body.Close()
if err != nil {
logrus.Errorf("Failed to close response body: %v", err)
}
}(resp.Body)
return nil
} else {
if adminUsername == "" || adminPassword == "" {
return fmt.Errorf("PROMETHEUS_ADMIN_USERNAME and PROMETHEUS_ADMIN_PASSWORD must be set")
}
}

func getScrapeTargets(ctx context.Context, registry *etcdregistry.EtcdRegistry) ([]ServiceGroup, error) {
// Get all service groups
services, err := registry.GetServiceGroups(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get service groups: %w", err)
}
_, err := util.RetryOperation(
func() (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

var groups []ServiceGroup
req, err := http.NewRequestWithContext(ctx, "POST", "http://localhost:9090/-/reload", nil)
if err != nil {
return false, fmt.Errorf("failed to create request: %w", err)
}
req.SetBasicAuth(adminUsername, adminPassword)

// For each service, get its group and nodes
for _, serviceName := range services {
group, err := registry.GetServiceGroup(ctx, serviceName)
if err != nil {
logrus.WithError(err).Warnf("Failed to get service group %s", serviceName)
continue
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return false, fmt.Errorf("failed to reload prometheus config: %w", err)
}
defer func(Body io.ReadCloser) {
err := Body.Close()
if err != nil {
logrus.Errorf("Failed to close response body: %v", err)
}
}(resp.Body)

nodes, err := group.GetNodes(ctx)
if err != nil {
logrus.WithError(err).Warnf("Failed to get nodes for service %s", serviceName)
continue
}
return true, nil
},
util.ConfigRetry,
3,
)

// Create a ServiceGroup for each node
for _, node := range nodes {
address := fmt.Sprintf("%s:%d", node.ID, node.Port)
if address == "" {
logrus.Warnf("Invalid address for node %s in service %s", node.ID, serviceName)
continue
}
return err
}

metricsPath := node.MetricsPath
if metricsPath == "" {
metricsPath = "/metrics"
func getScrapeTargets(ctx context.Context, registry *etcdregistry.EtcdRegistry) ([]ServiceGroup, error) {
groups, err := util.RetryOperation(
func() ([]ServiceGroup, error) {
// Get all service groups
services, err := registry.GetServiceGroups(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get service groups: %w", err)
}

// Merge group common labels with node labels
labels := make(map[string]string)
for k, v := range group.Spec.CommonLabels {
labels[k] = v
}
for k, v := range node.Labels {
labels[k] = v
}
var groups []ServiceGroup

sg := ServiceGroup{
Name: serviceName,
Targets: []string{address},
MetricsPath: metricsPath,
NodeID: node.ID,
Labels: labels,
}
// For each service, get its group and nodes
for _, serviceName := range services {
group, err := registry.GetServiceGroup(ctx, serviceName)
if err != nil {
logrus.WithError(err).Warnf("Failed to get service group %s", serviceName)
continue
}

nodes, err := group.GetNodes(ctx)
if err != nil {
logrus.WithError(err).Warnf("Failed to get nodes for service %s", serviceName)
continue
}

// Use auth from group spec if available
if group.Spec.Username != "" && group.Spec.Password != "" {
sg.BasicAuth = &BasicAuth{
Username: group.Spec.Username,
Password: group.Spec.Password,
// Create a ServiceGroup for each node
for _, node := range nodes {
address := fmt.Sprintf("%s:%d", node.ID, node.Port)
if address == "" {
logrus.Warnf("Invalid address for node %s in service %s", node.ID, serviceName)
continue
}

metricsPath := node.MetricsPath
if metricsPath == "" {
metricsPath = "/metrics"
}

// Merge group common labels with node labels
labels := make(map[string]string)
for k, v := range group.Spec.CommonLabels {
labels[k] = v
}
for k, v := range node.Labels {
labels[k] = v
}

sg := ServiceGroup{
Name: serviceName,
Targets: []string{address},
MetricsPath: metricsPath,
NodeID: node.ID,
Labels: labels,
}

// Use auth from group spec if available
if group.Spec.Username != "" && group.Spec.Password != "" {
sg.BasicAuth = &BasicAuth{
Username: group.Spec.Username,
Password: group.Spec.Password,
}
}

groups = append(groups, sg)
}
}

groups = append(groups, sg)
}
}
return groups, nil
},
util.EtcdRetry,
3,
)

return groups, nil
return groups, err
}

func createPrometheusConfig(serviceGroups []ServiceGroup) PrometheusConfig {
Expand All @@ -232,30 +252,42 @@ func createPrometheusConfig(serviceGroups []ServiceGroup) PrometheusConfig {
}

func updatePrometheusConfig(configFile string, serviceGroups []ServiceGroup) error {
config := createPrometheusConfig(serviceGroups)
contents, err := executeTemplate(config)
if err != nil {
return fmt.Errorf("failed to execute template: %w", err)
}
_, err := util.RetryOperation(
func() (bool, error) {
config := createPrometheusConfig(serviceGroups)
contents, err := executeTemplate(config)
if err != nil {
return false, fmt.Errorf("failed to execute template: %w", err)
}

// Add YAML validation
var testConfig map[string]interface{}
if err := yaml.Unmarshal([]byte(contents), &testConfig); err != nil {
logrus.WithError(err).WithField("config", contents).Error("Generated invalid YAML configuration")
return fmt.Errorf("generated invalid YAML: %w", err)
}
// Add YAML validation
var testConfig map[string]interface{}
if err := yaml.Unmarshal([]byte(contents), &testConfig); err != nil {
logrus.WithError(err).WithField("config", contents).Error("Generated invalid YAML configuration")
return false, fmt.Errorf("generated invalid YAML: %w", err)
}

// Log the generated configuration for debugging
logrus.WithFields(logrus.Fields{
"config_file": configFile,
"yaml": contents,
}).Debug("Writing new configuration")
// Log the generated configuration for debugging
logrus.WithFields(logrus.Fields{
"config_file": configFile,
"yaml": contents,
}).Debug("Writing new configuration")

if err := writeConfigFile(configFile, []byte(contents)); err != nil {
return fmt.Errorf("failed to write config: %w", err)
}
if err := writeConfigFile(configFile, []byte(contents)); err != nil {
return false, fmt.Errorf("failed to write config: %w", err)
}

if err := reloadPrometheus(); err != nil {
return false, fmt.Errorf("failed to reload prometheus: %w", err)
}

return true, nil
},
util.ConfigRetry,
3,
)

return reloadPrometheus()
return err
}

var (
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
)

require (
github.com/cenkalti/backoff/v5 v5.0.0 // indirect
github.com/coreos/go-semver v0.3.1 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
github.com/cenkalti/backoff/v5 v5.0.0 h1:4ziwFuaVJicDO1ah1Nz1aXXV1caM28PFgf1V5TTFXew=
github.com/cenkalti/backoff/v5 v5.0.0/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
github.com/coreos/go-semver v0.3.1 h1:yi21YpKnrx1gt5R+la8n5WgS0kCrsPp33dmEyHReZr4=
github.com/coreos/go-semver v0.3.1/go.mod h1:irMmmIw/7yzSRPWryHsK7EYSg09caPQL03VsM8rvUec=
github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs=
Expand Down
51 changes: 51 additions & 0 deletions pkg/util/backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package util

import (
"context"
"github.com/cenkalti/backoff/v5"
"time"
)

// RetryConfig provides standard retry configurations
type RetryConfig struct {
InitialInterval time.Duration
MaxInterval time.Duration
MaxElapsedTime time.Duration
RandomizationFactor float64
}

var (
// EtcdRetry is configured for etcd operations
EtcdRetry = RetryConfig{
InitialInterval: 500 * time.Millisecond,
MaxInterval: 10 * time.Second,
RandomizationFactor: 0.1,
}

// ConfigRetry is configured for Prometheus configuration operations
ConfigRetry = RetryConfig{
InitialInterval: 1 * time.Second,
MaxInterval: 5 * time.Second,
RandomizationFactor: 0.1,
}
)

// RetryOperation executes an operation with retry logic
func RetryOperation[T any](operation func() (T, error), config RetryConfig, maxRetries uint) (T, error) {
b := backoff.NewExponentialBackOff()
b.InitialInterval = config.InitialInterval
b.MaxInterval = config.MaxInterval
b.RandomizationFactor = config.RandomizationFactor

var result T
ctx := context.Background()

result, err := backoff.Retry(
ctx,
operation,
backoff.WithMaxTries(maxRetries),
backoff.WithBackOff(b),
)

return result, err
}

0 comments on commit 5f3042b

Please sign in to comment.