diff --git a/cmd/promster/main.go b/cmd/promster/main.go index ab788e1..e433ea2 100644 --- a/cmd/promster/main.go +++ b/cmd/promster/main.go @@ -2,20 +2,20 @@ package main import ( "context" - "encoding/json" "fmt" "net/http" "os" "os/signal" + "reflect" "strings" "sync" "text/template" "time" + _ "embed" "github.com/sirupsen/logrus" "github.com/urfave/cli/v3" etcdregistry "go.lumeweb.com/etcd-registry" - _ "embed" ) //go:embed prometheus.yml.tmpl @@ -27,22 +27,55 @@ const ( ) type BasicAuth struct { + Username string `json:"username,omitempty"` Password string `json:"password,omitempty"` } -type SourceTarget struct { - Targets []string `json:"targets"` - Labels map[string]string `json:"labels,omitempty"` - BasicAuth *BasicAuth `json:"basic_auth,omitempty"` +type ServiceGroup struct { + Name string `json:"name"` + Targets []string `json:"targets"` + BasicAuth *BasicAuth `json:"basic_auth,omitempty"` } -type RecordingRule struct { - name string - expr string - labels map[string]string +type PrometheusConfig struct { + ServiceGroups []ServiceGroup `json:"service_groups"` + ScrapeInterval string `json:"scrape_interval"` + ScrapeTimeout string `json:"scrape_timeout"` + EvaluationInterval string `json:"evaluation_interval"` + Scheme string `json:"scheme"` + TlsInsecure string `json:"tls_insecure"` + AdminUsername string `json:"admin_username,omitempty"` + AdminPassword string `json:"admin_password,omitempty"` +} + +func (sg *ServiceGroup) Validate() error { + if sg.Name == "" { + return fmt.Errorf("service group name cannot be empty") + } + if len(sg.Targets) == 0 { + return fmt.Errorf("service group %s must have at least one target", sg.Name) + } + if sg.BasicAuth != nil { + if sg.BasicAuth.Username == "" { + return fmt.Errorf("service group %s basic auth username cannot be empty", sg.Name) + } + if sg.BasicAuth.Password == "" { + return fmt.Errorf("service group %s basic auth password cannot be empty", sg.Name) + } + } + return nil } func executeTemplate(data interface{}) (string, error) { + // Validate service groups before template execution + config, ok := data.(PrometheusConfig) + if ok { + for _, sg := range config.ServiceGroups { + if err := sg.Validate(); err != nil { + return "", fmt.Errorf("invalid service group configuration: %w", err) + } + } + } tmpl, err := template.New(PROM_TEMPLATE_FILE).Parse(prometheusTemplate) if err != nil { return "", err @@ -91,8 +124,10 @@ func reloadPrometheus() error { } } -func getScrapeTargets(registry *etcdregistry.EtcdRegistry, scrapeEtcdPaths []string) []SourceTarget { - targets := make([]SourceTarget, 0) +func getScrapeTargets(registry *etcdregistry.EtcdRegistry, scrapeEtcdPaths []string) []ServiceGroup { + // Map to group targets by service name and auth + groupMap := make(map[string]*ServiceGroup) + for _, path := range scrapeEtcdPaths { nodes, err := registry.GetServiceNodes(path) if err != nil { @@ -101,78 +136,72 @@ func getScrapeTargets(registry *etcdregistry.EtcdRegistry, scrapeEtcdPaths []str } for _, node := range nodes { - target := SourceTarget{ - Labels: map[string]string{"prsn": node.Name}, - Targets: []string{node.Info["address"]}, - } + serviceName := node.Name + address := node.Info["address"] + // Create auth key for grouping + authKey := "" if password, ok := node.Info["password"]; ok { - target.BasicAuth = &BasicAuth{ - Password: password, + username := node.Info["username"] + if username == "" { + username = "prometheus" // default username } + authKey = username + ":" + password } - targets = append(targets, target) - } - } - return targets -} - -func areScrapeTargetsEqual(a, b []SourceTarget) bool { - if len(a) != len(b) { - return false - } - - for i := range a { - if !areSourceTargetsEqual(a[i], b[i]) { - return false - } - } - - return true -} + // Create group key + groupKey := serviceName + "|" + authKey -func areSourceTargetsEqual(a, b SourceTarget) bool { - if len(a.Targets) != len(b.Targets) { - return false - } + group, exists := groupMap[groupKey] + if !exists { + group = &ServiceGroup{ + Name: serviceName, + Targets: make([]string, 0), + } - for i := range a.Targets { - if a.Targets[i] != b.Targets[i] { - return false - } - } + // Only set auth if credentials exist + if authKey != "" { + username := node.Info["username"] + if username == "" { + username = "prometheus" + } + group.BasicAuth = &BasicAuth{ + Username: username, + Password: node.Info["password"], + } + } - if len(a.Labels) != len(b.Labels) { - return false - } + groupMap[groupKey] = group + } - for k, v := range a.Labels { - if b.Labels[k] != v { - return false + group.Targets = append(group.Targets, address) } } - if (a.BasicAuth == nil) != (b.BasicAuth == nil) { - return false - } - if a.BasicAuth != nil && b.BasicAuth != nil { - if a.BasicAuth.Password != b.BasicAuth.Password { - return false - } + // Convert map to slice + groups := make([]ServiceGroup, 0, len(groupMap)) + for _, group := range groupMap { + groups = append(groups, *group) } - return true + return groups } -func updatePrometheusConfig(configFile string, config map[string]interface{}) error { - adminUsername := os.Getenv("PROMETHEUS_ADMIN_USERNAME") - adminPassword := os.Getenv("PROMETHEUS_ADMIN_PASSWORD") - config["adminUsername"] = adminUsername - if adminPassword != "" { - config["adminPassword"] = adminPassword +func createPrometheusConfig(serviceGroups []ServiceGroup) PrometheusConfig { + return PrometheusConfig{ + ServiceGroups: serviceGroups, + ScrapeInterval: scrapeInterval, + ScrapeTimeout: scrapeTimeout, + EvaluationInterval: evaluationInterval, + Scheme: scheme, + TlsInsecure: tlsInsecure, + AdminUsername: os.Getenv("PROMETHEUS_ADMIN_USERNAME"), + AdminPassword: os.Getenv("PROMETHEUS_ADMIN_PASSWORD"), } +} +func updatePrometheusConfig(configFile string, serviceGroups []ServiceGroup) error { + config := createPrometheusConfig(serviceGroups) contents, err := executeTemplate(config) if err != nil { return fmt.Errorf("failed to execute template: %w", err) @@ -185,116 +214,41 @@ func updatePrometheusConfig(configFile string, config map[string]interface{}) er return reloadPrometheus() } -func updatePrometheusTargets(scrapeTargets []SourceTarget) error { - contents, err := json.Marshal(scrapeTargets) - if err != nil { - return fmt.Errorf("failed to marshal targets: %w", err) - } - - if err := writeConfigFile("/servers.json", contents); err != nil { - return err - } - - return reloadPrometheus() -} - -func getLabelMap(rawLabels string) map[string]string { - toReturn := make(map[string]string) - mappings := strings.Split(rawLabels, ",") - for _, mapping := range mappings { - if mapping != "" { - var keyValue = strings.Split(mapping, ":") - toReturn[keyValue[0]] = keyValue[1] - } - } - - return toReturn -} - -func getPrintableLabels(labels map[string]string) string { - if len(labels) <= 0 { - return "" - } - - var toReturn = ` - labels:` - for k, v := range labels { - var format = ` - %s: %s` - toReturn += fmt.Sprintf(format, k, v) - } - return toReturn -} - -func createRulesFromENV(rulesFile string) error { - env := make(map[string]string) - for _, e := range os.Environ() { - pair := strings.Split(e, "=") - env[pair[0]] = pair[1] - } - rules := make([]RecordingRule, 0) - for i := 1; i < 100; i++ { - kname := fmt.Sprintf("RECORD_RULE_%d_NAME", i) - kexpr := fmt.Sprintf("RECORD_RULE_%d_EXPR", i) - klabels := fmt.Sprintf("RECORD_RULE_%d_LABELS", i) - - vname, exists := env[kname] - if !exists { - break - } - vexpr, exists := env[kexpr] - if !exists { - break - } - - rules = append(rules, RecordingRule{name: vname, expr: vexpr, labels: getLabelMap(env[klabels])}) - } - - if len(rules) == 0 { - logrus.Infof("No prometheus rules found in environment variables") - return nil - } - - rulesContents := `groups: - - name: env-rules - rules:` - - for _, v := range rules { - rc := `%s - - record: %s - expr: %s -%s -` - rulesContents = fmt.Sprintf(rc, rulesContents, v.name, v.expr, getPrintableLabels(v.labels)) - } - - if err := writeConfigFile(rulesFile, []byte(rulesContents)); err != nil { - return err - } - - return reloadPrometheus() -} +var ( + scrapeInterval string + scrapeTimeout string + evaluationInterval string + scheme string + tlsInsecure string + configFile string +) func monitorTargets(ctx context.Context, registry *etcdregistry.EtcdRegistry, scrapeEtcdPaths []string) error { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() var ( - previousTargets []SourceTarget - targetsMu sync.RWMutex + previousGroups []ServiceGroup + targetsMu sync.RWMutex ) for { select { case <-ticker.C: - targets := getScrapeTargets(registry, scrapeEtcdPaths) + serviceGroups := getScrapeTargets(registry, scrapeEtcdPaths) targetsMu.Lock() - if !areScrapeTargetsEqual(targets, previousTargets) { - if err := updatePrometheusTargets(targets); err != nil { - logrus.WithError(err).Error("Failed to update prometheus targets") + if !reflect.DeepEqual(serviceGroups, previousGroups) { + logrus.WithFields(logrus.Fields{ + "num_groups": len(serviceGroups), + "groups": serviceGroups, + }).Info("Service groups changed, updating configuration") + + if err := updatePrometheusConfig(configFile, serviceGroups); err != nil { + logrus.WithError(err).Error("Failed to update prometheus config") } else { - previousTargets = targets + logrus.Info("Successfully updated prometheus configuration") + previousGroups = serviceGroups } } targetsMu.Unlock() @@ -322,11 +276,11 @@ func run(ctx context.Context, cmd *cli.Command) error { etcdURLScrape := cmd.String("scrape-etcd-url") etcdBasePath := cmd.String("etcd-base-path") scrapeEtcdPaths := strings.Split(cmd.String("scrape-etcd-paths"), ",") - scrapeInterval := cmd.String("scrape-interval") - scrapeTimeout := cmd.String("scrape-timeout") - evaluationInterval := cmd.String("evaluation-interval") - scheme := cmd.String("scheme") - tlsInsecure := cmd.String("tls-insecure") + scrapeInterval = cmd.String("scrape-interval") + scrapeTimeout = cmd.String("scrape-timeout") + evaluationInterval = cmd.String("evaluation-interval") + scheme = cmd.String("scheme") + tlsInsecure = cmd.String("tls-insecure") etcdUsername := cmd.String("etcd-username") etcdPassword := cmd.String("etcd-password") etcdTimeout := cmd.Duration("etcd-timeout") @@ -345,25 +299,26 @@ func run(ctx context.Context, cmd *cli.Command) error { return fmt.Errorf("failed to create etcd registry: %w", err) } - // Create initial prometheus config without scrape paths - config := map[string]interface{}{ - "scrapeInterval": scrapeInterval, - "scrapeTimeout": scrapeTimeout, - "evaluationInterval": evaluationInterval, - "scheme": scheme, - "tlsInsecure": tlsInsecure, - } - - configFile := os.Getenv("PROMETHEUS_CONFIG_FILE") + configFile = os.Getenv("PROMETHEUS_CONFIG_FILE") if configFile == "" { configFile = PROM_CONFIG_FILE } - if err := updatePrometheusConfig(configFile, config); err != nil { + // Verify admin credentials are set + if os.Getenv("PROMETHEUS_ADMIN_USERNAME") == "" || os.Getenv("PROMETHEUS_ADMIN_PASSWORD") == "" { + return fmt.Errorf("PROMETHEUS_ADMIN_USERNAME and PROMETHEUS_ADMIN_PASSWORD must be set") + } + + if err := updatePrometheusConfig(configFile, []ServiceGroup{}); err != nil { return fmt.Errorf("failed to update initial prometheus config: %w", err) } - if err := createRulesFromENV("/rules.yml"); err != nil { + rulesFile := os.Getenv("PROMETHEUS_RULES_FILE") + if rulesFile == "" { + rulesFile = "/rules.yml" + } + + if err := createRulesFromENV(rulesFile); err != nil { return fmt.Errorf("failed to create rules: %w", err) } diff --git a/cmd/promster/prometheus.yml.tmpl b/cmd/promster/prometheus.yml.tmpl index 560f718..aac91f6 100644 --- a/cmd/promster/prometheus.yml.tmpl +++ b/cmd/promster/prometheus.yml.tmpl @@ -1,7 +1,7 @@ global: - scrape_interval: {{.scrapeInterval}} - scrape_timeout: {{.scrapeTimeout}} - evaluation_interval: {{.evaluationInterval}} + scrape_interval: {{.ScrapeInterval}} + scrape_timeout: {{.ScrapeTimeout}} + evaluation_interval: {{.EvaluationInterval}} rule_files: - /rules.yml @@ -10,17 +10,23 @@ scrape_configs: - job_name: 'prometheus' static_configs: - targets: ['localhost:9090'] - {{ if .adminPassword }} + {{ if .AdminPassword }} basic_auth: - username: {{.adminUsername}} - password: {{.adminPassword}} + username: {{.AdminUsername}} + password: {{.AdminPassword}} {{ end }} - - job_name: 'etcd' - scheme: {{.scheme}} + {{ range .ServiceGroups }} + - job_name: '{{ .Name }}' + scheme: {{$.Scheme}} tls_config: - insecure_skip_verify: {{.tlsInsecure}} - file_sd_configs: - - files: - - /servers.json + insecure_skip_verify: {{$.TlsInsecure}} + static_configs: + - targets: {{ .Targets }} + {{ if .BasicAuth }} + basic_auth: + username: {{ .BasicAuth.Username }} + password: {{ .BasicAuth.Password }} + {{ end }} metrics_path: /metrics + {{ end }} diff --git a/cmd/promster/rules.go b/cmd/promster/rules.go new file mode 100644 index 0000000..774acdc --- /dev/null +++ b/cmd/promster/rules.go @@ -0,0 +1,159 @@ +package main + +import ( + "fmt" + "os" + "path/filepath" + "strconv" + "strings" + + "github.com/sirupsen/logrus" + "golang.org/x/sys/unix" + "gopkg.in/yaml.v2" +) + +type RecordingRule struct { + name string + expr string + labels map[string]string +} + +func getLabelMap(rawLabels string) (map[string]string, error) { + toReturn := make(map[string]string) + if rawLabels == "" { + return toReturn, nil + } + + mappings := strings.Split(rawLabels, ",") + for _, mapping := range mappings { + if mapping == "" { + continue + } + keyValue := strings.Split(mapping, ":") + if len(keyValue) != 2 { + return nil, fmt.Errorf("invalid label format: %s", mapping) + } + toReturn[strings.TrimSpace(keyValue[0])] = strings.TrimSpace(keyValue[1]) + } + return toReturn, nil +} + +func getPrintableLabels(labels map[string]string) string { + if len(labels) <= 0 { + return "" + } + + var toReturn = ` + labels:` + for k, v := range labels { + var format = ` + %s: %s` + toReturn += fmt.Sprintf(format, k, v) + } + return toReturn +} + +func validateRuleExpr(expr string) error { + if expr == "" { + return fmt.Errorf("expression cannot be empty") + } + return nil +} + +func createRulesFromENV(rulesFile string) error { + // Validate rules file path is writable + dir := filepath.Dir(rulesFile) + if err := unix.Access(dir, unix.W_OK); err != nil { + return fmt.Errorf("rules directory %s is not writable: %w", dir, err) + } + + // Check if rules file exists and is writable + if _, err := os.Stat(rulesFile); err == nil { + if err := unix.Access(rulesFile, unix.W_OK); err != nil { + return fmt.Errorf("rules file %s is not writable: %w", rulesFile, err) + } + } else { + // Create rules file if it doesn't exist + if _, err := os.OpenFile(rulesFile, os.O_CREATE, 0644); err != nil { + return fmt.Errorf("failed to create rules file: %w", err) + } + } + + maxRules := 100 + if maxStr := os.Getenv("PROMETHEUS_MAX_RULES"); maxStr != "" { + if max, err := strconv.Atoi(maxStr); err == nil && max > 0 { + maxRules = max + } + } + + env := make(map[string]string) + for _, e := range os.Environ() { + pair := strings.Split(e, "=") + env[pair[0]] = pair[1] + } + + rules := make([]RecordingRule, 0) + for i := 1; i <= maxRules; i++ { + kname := fmt.Sprintf("RECORD_RULE_%d_NAME", i) + kexpr := fmt.Sprintf("RECORD_RULE_%d_EXPR", i) + klabels := fmt.Sprintf("RECORD_RULE_%d_LABELS", i) + + vname, exists := env[kname] + if !exists { + break + } + vexpr, exists := env[kexpr] + if !exists { + break + } + + labels, err := getLabelMap(env[klabels]) + if err != nil { + return fmt.Errorf("invalid labels for rule %s: %w", vname, err) + } + + if err := validateRuleExpr(vexpr); err != nil { + return fmt.Errorf("invalid expression for rule %s: %w", vname, err) + } + + rules = append(rules, RecordingRule{ + name: vname, + expr: vexpr, + labels: labels, + }) + } + + if len(rules) == 0 { + logrus.Info("No prometheus rules found in environment variables") + return nil + } + + rulesContents := `groups: + - name: env-rules + rules:` + + for _, v := range rules { + rc := `%s + - record: %s + expr: %s +%s +` + rulesContents = fmt.Sprintf(rc, rulesContents, v.name, v.expr, getPrintableLabels(v.labels)) + } + + // Validate YAML before writing + if err := validateYAML([]byte(rulesContents)); err != nil { + return fmt.Errorf("invalid YAML rules: %w", err) + } + + if err := writeConfigFile(rulesFile, []byte(rulesContents)); err != nil { + return fmt.Errorf("failed to write rules file: %w", err) + } + + return reloadPrometheus() +} + +func validateYAML(data []byte) error { + var out interface{} + return yaml.Unmarshal(data, &out) +} diff --git a/go.mod b/go.mod index 5cc023b..8c3364f 100644 --- a/go.mod +++ b/go.mod @@ -5,8 +5,9 @@ go 1.23.2 require ( github.com/sirupsen/logrus v1.9.3 github.com/urfave/cli/v3 v3.0.0-beta1 - go.etcd.io/etcd/client/v3 v3.5.17 go.lumeweb.com/etcd-registry v0.0.0-20241225024027-d8584c28e112 + golang.org/x/sys v0.18.0 + gopkg.in/yaml.v2 v2.4.0 ) require ( @@ -17,11 +18,11 @@ require ( github.com/pkg/errors v0.9.1 // indirect go.etcd.io/etcd/api/v3 v3.5.17 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.17 // indirect + go.etcd.io/etcd/client/v3 v3.5.17 // indirect go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect go.uber.org/zap v1.17.0 // indirect golang.org/x/net v0.23.0 // indirect - golang.org/x/sys v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect google.golang.org/genproto v0.0.0-20230822172742-b8732ec3820d // indirect google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d // indirect diff --git a/go.sum b/go.sum index ba87b71..76e9975 100644 --- a/go.sum +++ b/go.sum @@ -86,6 +86,7 @@ google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk= google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=