From 6d9a4fc5cb77ddcf1b63f04988e9ac79b9331b10 Mon Sep 17 00:00:00 2001 From: iamrajiv Date: Sat, 2 Aug 2025 12:31:03 +0530 Subject: [PATCH 01/24] add echo --- internal/component/all/all.go | 1 + internal/component/prometheus/echo/echo.go | 279 +++++++++++++++++++++ 2 files changed, 280 insertions(+) create mode 100644 internal/component/prometheus/echo/echo.go diff --git a/internal/component/all/all.go b/internal/component/all/all.go index 3f739dbe63f..5cb6351cb08 100644 --- a/internal/component/all/all.go +++ b/internal/component/all/all.go @@ -161,6 +161,7 @@ import ( _ "github.com/grafana/alloy/internal/component/prometheus/operator/scrapeconfigs" // Import prometheus.operator.scrapeconfigs _ "github.com/grafana/alloy/internal/component/prometheus/operator/servicemonitors" // Import prometheus.operator.servicemonitors _ "github.com/grafana/alloy/internal/component/prometheus/receive_http" // Import prometheus.receive_http + _ "github.com/grafana/alloy/internal/component/prometheus/echo" // Import prometheus.echo _ "github.com/grafana/alloy/internal/component/prometheus/relabel" // Import prometheus.relabel _ "github.com/grafana/alloy/internal/component/prometheus/remotewrite" // Import prometheus.remote_write _ "github.com/grafana/alloy/internal/component/prometheus/scrape" // Import prometheus.scrape diff --git a/internal/component/prometheus/echo/echo.go b/internal/component/prometheus/echo/echo.go new file mode 100644 index 00000000000..bfc76a71f3a --- /dev/null +++ b/internal/component/prometheus/echo/echo.go @@ -0,0 +1,279 @@ +package echo + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/featuregate" + "github.com/grafana/alloy/internal/runtime/logging/level" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/metadata" + "github.com/prometheus/prometheus/storage" +) + +func init() { + component.Register(component.Registration{ + Name: "prometheus.echo", + Stability: featuregate.StabilityGenerallyAvailable, + Args: Arguments{}, + Exports: Exports{}, + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + return New(opts, args.(Arguments)) + }, + }) +} + +type Arguments struct{} + +type Exports struct { + Receiver storage.Appendable `alloy:"receiver,attr"` +} + +var DefaultArguments = Arguments{} + +func (args *Arguments) SetToDefault() { + *args = DefaultArguments +} + +var ( + _ component.Component = (*Component)(nil) + _ storage.Appendable = (*Component)(nil) +) + +type Component struct { + opts component.Options + mut sync.RWMutex + args Arguments +} + +func New(o component.Options, args Arguments) (*Component, error) { + c := &Component{ + opts: o, + } + if err := c.Update(args); err != nil { + return nil, err + } + if o.OnStateChange != nil { + o.OnStateChange(Exports{Receiver: c}) + } + return c, nil +} + +func (c *Component) Run(ctx context.Context) error { + <-ctx.Done() + return nil +} + +func (c *Component) Update(args component.Arguments) error { + newArgs := args.(Arguments) + c.mut.Lock() + defer c.mut.Unlock() + c.args = newArgs + return nil +} + +func (c *Component) Appender(ctx context.Context) storage.Appender { + return &echoAppender{ + logger: c.opts.Logger, + componentID: c.opts.ID, + } +} + +type echoAppender struct { + logger log.Logger + componentID string + metrics []metricData + mut sync.Mutex +} + +type metricData struct { + labels labels.Labels + timestamp int64 + value float64 + histogram *histogram.Histogram + fHistogram *histogram.FloatHistogram + exemplar *exemplar.Exemplar + metadata *metadata.Metadata + isZeroSample bool + ct int64 +} + +func (a *echoAppender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { + a.mut.Lock() + defer a.mut.Unlock() + a.metrics = append(a.metrics, metricData{ + labels: l.Copy(), + timestamp: t, + value: v, + }) + if ref == 0 { + ref = storage.SeriesRef(len(a.metrics)) + } + return ref, nil +} + +func (a *echoAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { + a.mut.Lock() + defer a.mut.Unlock() + for i := len(a.metrics) - 1; i >= 0; i-- { + if labels.Equal(a.metrics[i].labels, l) { + a.metrics[i].exemplar = &e + break + } + } + if ref == 0 { + ref = storage.SeriesRef(1) + } + return ref, nil +} + +func (a *echoAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + a.mut.Lock() + defer a.mut.Unlock() + a.metrics = append(a.metrics, metricData{ + labels: l.Copy(), + timestamp: t, + histogram: h, + fHistogram: fh, + }) + if ref == 0 { + ref = storage.SeriesRef(len(a.metrics)) + } + return ref, nil +} + +func (a *echoAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) { + a.mut.Lock() + defer a.mut.Unlock() + for i := range a.metrics { + if labels.Equal(a.metrics[i].labels, l) { + a.metrics[i].metadata = &m + } + } + if ref == 0 { + ref = storage.SeriesRef(1) + } + return ref, nil +} + +func (a *echoAppender) AppendCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64) (storage.SeriesRef, error) { + a.mut.Lock() + defer a.mut.Unlock() + a.metrics = append(a.metrics, metricData{ + labels: l.Copy(), + timestamp: t, + value: 0, + isZeroSample: true, + ct: ct, + }) + if ref == 0 { + ref = storage.SeriesRef(len(a.metrics)) + } + return ref, nil +} + +func (a *echoAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + a.mut.Lock() + defer a.mut.Unlock() + a.metrics = append(a.metrics, metricData{ + labels: l.Copy(), + timestamp: t, + histogram: h, + fHistogram: fh, + isZeroSample: true, + ct: ct, + }) + if ref == 0 { + ref = storage.SeriesRef(len(a.metrics)) + } + return ref, nil +} + +func (a *echoAppender) Commit() error { + a.mut.Lock() + defer a.mut.Unlock() + metricGroups := make(map[string][]metricData) + for _, m := range a.metrics { + name := m.labels.Get(model.MetricNameLabel) + if name == "" { + name = "{no_metric_name}" + } + metricGroups[name] = append(metricGroups[name], m) + } + var output strings.Builder + output.WriteString(fmt.Sprintf("# Prometheus metrics received by %s\n", a.componentID)) + output.WriteString(fmt.Sprintf("# Timestamp: %s\n", time.Now().Format(time.RFC3339))) + for metricName, metrics := range metricGroups { + if len(metrics) == 0 { + continue + } + if metrics[0].metadata != nil { + if metrics[0].metadata.Help != "" { + output.WriteString(fmt.Sprintf("# HELP %s %s\n", metricName, metrics[0].metadata.Help)) + } + if metrics[0].metadata.Type != "" { + output.WriteString(fmt.Sprintf("# TYPE %s %s\n", metricName, metrics[0].metadata.Type)) + } + } + for _, m := range metrics { + output.WriteString(formatMetric(m)) + } + } + level.Info(a.logger).Log("component", a.componentID, "metrics", output.String()) + a.metrics = a.metrics[:0] + return nil +} + +func (a *echoAppender) Rollback() error { + a.mut.Lock() + defer a.mut.Unlock() + a.metrics = a.metrics[:0] + return nil +} + +func formatMetric(m metricData) string { + var output strings.Builder + metricName := m.labels.Get(model.MetricNameLabel) + if metricName == "" { + metricName = "{no_metric_name}" + } + output.WriteString(metricName) + labelPairs := make([]string, 0, len(m.labels)-1) + for _, l := range m.labels { + if l.Name != model.MetricNameLabel { + labelPairs = append(labelPairs, fmt.Sprintf("%s=%q", l.Name, l.Value)) + } + } + if len(labelPairs) > 0 { + output.WriteString("{") + output.WriteString(strings.Join(labelPairs, ",")) + output.WriteString("}") + } + if m.histogram != nil || m.fHistogram != nil { + output.WriteString(" {histogram}") + } else { + output.WriteString(fmt.Sprintf(" %g", m.value)) + } + if m.timestamp > 0 { + output.WriteString(fmt.Sprintf(" %d", m.timestamp)) + } + if m.exemplar != nil { + output.WriteString(fmt.Sprintf(" # {%s} %g", m.exemplar.Labels, m.exemplar.Value)) + } + if m.isZeroSample && m.ct > 0 { + output.WriteString(fmt.Sprintf(" # CreatedTimestamp: %d", m.ct)) + } + output.WriteString("\n") + return output.String() +} + +func (a *echoAppender) SetOptions(opts *storage.AppendOptions) { +} From f2347b752e41c9ef7085d84f1da019f41495dc5b Mon Sep 17 00:00:00 2001 From: iamrajiv Date: Sat, 2 Aug 2025 12:31:09 +0530 Subject: [PATCH 02/24] add test --- .../component/prometheus/echo/echo_test.go | 143 ++++++++++++++++++ 1 file changed, 143 insertions(+) create mode 100644 internal/component/prometheus/echo/echo_test.go diff --git a/internal/component/prometheus/echo/echo_test.go b/internal/component/prometheus/echo/echo_test.go new file mode 100644 index 00000000000..42998d6e408 --- /dev/null +++ b/internal/component/prometheus/echo/echo_test.go @@ -0,0 +1,143 @@ +package echo + +import ( + "context" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/runtime/componenttest" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/stretchr/testify/require" +) + +func TestArgumentsDefaults(t *testing.T) { + args := Arguments{} + args.SetToDefault() + require.Equal(t, DefaultArguments, args) +} + +func TestComponent_Creation(t *testing.T) { + ctx := componenttest.TestContext(t) + + comp, err := New(component.Options{ + ID: "test", + Logger: log.NewNopLogger(), + }, Arguments{}) + require.NoError(t, err) + require.NotNil(t, comp) + + ctx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + + go func() { + err := comp.Run(ctx) + require.NoError(t, err) + }() + + time.Sleep(100 * time.Millisecond) +} + +func TestComponent_ExportsReceiver(t *testing.T) { + var exports component.Exports + + comp, err := New(component.Options{ + ID: "test", + Logger: log.NewNopLogger(), + OnStateChange: func(e component.Exports) { + exports = e + }, + }, Arguments{}) + require.NoError(t, err) + require.NotNil(t, comp) + + echoExports, ok := exports.(Exports) + require.True(t, ok) + require.NotNil(t, echoExports.Receiver) + + require.Equal(t, comp, echoExports.Receiver) +} + +func TestAppender_BasicMetrics(t *testing.T) { + comp, err := New(component.Options{ + ID: "test", + Logger: log.NewNopLogger(), + }, Arguments{}) + require.NoError(t, err) + + ctx := context.Background() + appender := comp.Appender(ctx) + require.NotNil(t, appender) + + lbls := labels.FromStrings("__name__", "test_metric", "job", "test") + + ref, err := appender.Append(0, lbls, time.Now().Unix(), 42.0) + require.NoError(t, err) + require.NotEqual(t, storage.SeriesRef(0), ref) + + err = appender.Commit() + require.NoError(t, err) +} + +func TestAppender_Rollback(t *testing.T) { + comp, err := New(component.Options{ + ID: "test", + Logger: log.NewNopLogger(), + }, Arguments{}) + require.NoError(t, err) + + ctx := context.Background() + appender := comp.Appender(ctx) + require.NotNil(t, appender) + + lbls := labels.FromStrings("__name__", "test_metric", "job", "test") + + _, err = appender.Append(0, lbls, time.Now().Unix(), 42.0) + require.NoError(t, err) + + err = appender.Rollback() + require.NoError(t, err) +} + +func TestAppender_MultipleMetrics(t *testing.T) { + comp, err := New(component.Options{ + ID: "test", + Logger: log.NewNopLogger(), + }, Arguments{}) + require.NoError(t, err) + + ctx := context.Background() + appender := comp.Appender(ctx) + require.NotNil(t, appender) + + metrics := []struct { + name string + value float64 + }{ + {"metric_one", 1.0}, + {"metric_two", 2.0}, + {"metric_three", 3.0}, + } + + for _, metric := range metrics { + lbls := labels.FromStrings("__name__", metric.name, "job", "test") + _, err = appender.Append(0, lbls, time.Now().Unix(), metric.value) + require.NoError(t, err) + } + + err = appender.Commit() + require.NoError(t, err) +} + +func TestComponent_Update(t *testing.T) { + comp, err := New(component.Options{ + ID: "test", + Logger: log.NewNopLogger(), + }, Arguments{}) + require.NoError(t, err) + + err = comp.Update(Arguments{}) + require.NoError(t, err) +} From a253d8e0694e8626b3dc6ddd44b320ac346a5623 Mon Sep 17 00:00:00 2001 From: iamrajiv Date: Sat, 2 Aug 2025 12:34:10 +0530 Subject: [PATCH 03/24] add changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 81a10cd3b14..2b374eff6a1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -289,6 +289,8 @@ v1.11.0 ### Features +- Add `prometheus.echo` component for local inspection of Prometheus metrics. The component writes received metrics to stdout in Prometheus exposition format, enabling easier debugging and testing of metrics flow. (@iamrajiv) + - Add the `otelcol.receiver.fluentforward` receiver to receive logs via Fluent Forward Protocol. (@rucciva) - Add the `prometheus.enrich` component to enrich metrics using labels from `discovery.*` components. (@ArkovKonstantin) From 1e44b4e654d9e04efe8bbe3128ebdf277c4033f6 Mon Sep 17 00:00:00 2001 From: iamrajiv Date: Mon, 4 Aug 2025 22:37:40 +0530 Subject: [PATCH 04/24] use prometheus expfmt encoder --- internal/component/prometheus/echo/echo.go | 464 ++++++++++++++---- .../component/prometheus/echo/echo_test.go | 82 ++++ 2 files changed, 439 insertions(+), 107 deletions(-) diff --git a/internal/component/prometheus/echo/echo.go b/internal/component/prometheus/echo/echo.go index bfc76a71f3a..ce7aa07f456 100644 --- a/internal/component/prometheus/echo/echo.go +++ b/internal/component/prometheus/echo/echo.go @@ -1,9 +1,9 @@ package echo import ( + "bytes" "context" - "fmt" - "strings" + "sort" "sync" "time" @@ -11,12 +11,16 @@ import ( "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/runtime/logging/level" + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/metadata" "github.com/prometheus/prometheus/storage" + "google.golang.org/protobuf/types/known/timestamppb" + "k8s.io/utils/ptr" ) func init() { @@ -31,13 +35,17 @@ func init() { }) } -type Arguments struct{} +type Arguments struct { + Format string `alloy:"format,attr,optional"` +} type Exports struct { Receiver storage.Appendable `alloy:"receiver,attr"` } -var DefaultArguments = Arguments{} +var DefaultArguments = Arguments{ + Format: "text", +} func (args *Arguments) SetToDefault() { *args = DefaultArguments @@ -81,41 +89,64 @@ func (c *Component) Update(args component.Arguments) error { } func (c *Component) Appender(ctx context.Context) storage.Appender { + c.mut.RLock() + format := c.args.Format + c.mut.RUnlock() + return &echoAppender{ logger: c.opts.Logger, componentID: c.opts.ID, + format: format, + samples: make(map[string]sample), + exemplars: make(map[string]seriesExemplar), + histograms: make(map[string]seriesHistogram), + metadata: make(map[string]metadata.Metadata), } } type echoAppender struct { logger log.Logger componentID string - metrics []metricData + format string mut sync.Mutex + + samples map[string]sample + exemplars map[string]seriesExemplar + histograms map[string]seriesHistogram + metadata map[string]metadata.Metadata } -type metricData struct { - labels labels.Labels - timestamp int64 - value float64 - histogram *histogram.Histogram - fHistogram *histogram.FloatHistogram - exemplar *exemplar.Exemplar - metadata *metadata.Metadata - isZeroSample bool - ct int64 +type sample struct { + Labels labels.Labels + Timestamp int64 + Value float64 + PrintTimestamp bool +} + +type seriesExemplar struct { + Labels labels.Labels + Exemplar exemplar.Exemplar +} + +type seriesHistogram struct { + Labels labels.Labels + Histogram histogram.Histogram } func (a *echoAppender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { a.mut.Lock() defer a.mut.Unlock() - a.metrics = append(a.metrics, metricData{ - labels: l.Copy(), - timestamp: t, - value: v, - }) + + key := l.String() + a.samples[key] = sample{ + Labels: l.Copy(), + Timestamp: t, + Value: v, + PrintTimestamp: t > 0, + } + if ref == 0 { - ref = storage.SeriesRef(len(a.metrics)) + ref = storage.SeriesRef(len(a.samples)) } return ref, nil } @@ -123,12 +154,13 @@ func (a *echoAppender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v func (a *echoAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { a.mut.Lock() defer a.mut.Unlock() - for i := len(a.metrics) - 1; i >= 0; i-- { - if labels.Equal(a.metrics[i].labels, l) { - a.metrics[i].exemplar = &e - break - } + + key := l.String() + a.exemplars[key] = seriesExemplar{ + Labels: l.Copy(), + Exemplar: e, } + if ref == 0 { ref = storage.SeriesRef(1) } @@ -138,14 +170,17 @@ func (a *echoAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e func (a *echoAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { a.mut.Lock() defer a.mut.Unlock() - a.metrics = append(a.metrics, metricData{ - labels: l.Copy(), - timestamp: t, - histogram: h, - fHistogram: fh, - }) + + key := l.String() + if h != nil { + a.histograms[key] = seriesHistogram{ + Labels: l.Copy(), + Histogram: *h, + } + } + if ref == 0 { - ref = storage.SeriesRef(len(a.metrics)) + ref = storage.SeriesRef(len(a.histograms)) } return ref, nil } @@ -153,11 +188,12 @@ func (a *echoAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t func (a *echoAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) { a.mut.Lock() defer a.mut.Unlock() - for i := range a.metrics { - if labels.Equal(a.metrics[i].labels, l) { - a.metrics[i].metadata = &m - } + + metricName := l.Get(model.MetricNameLabel) + if metricName != "" { + a.metadata[metricName] = m } + if ref == 0 { ref = storage.SeriesRef(1) } @@ -167,15 +203,17 @@ func (a *echoAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m func (a *echoAppender) AppendCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64) (storage.SeriesRef, error) { a.mut.Lock() defer a.mut.Unlock() - a.metrics = append(a.metrics, metricData{ - labels: l.Copy(), - timestamp: t, - value: 0, - isZeroSample: true, - ct: ct, - }) + + key := l.String() + a.samples[key] = sample{ + Labels: l.Copy(), + Timestamp: t, + Value: 0, + PrintTimestamp: t > 0, + } + if ref == 0 { - ref = storage.SeriesRef(len(a.metrics)) + ref = storage.SeriesRef(len(a.samples)) } return ref, nil } @@ -183,16 +221,17 @@ func (a *echoAppender) AppendCTZeroSample(ref storage.SeriesRef, l labels.Labels func (a *echoAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { a.mut.Lock() defer a.mut.Unlock() - a.metrics = append(a.metrics, metricData{ - labels: l.Copy(), - timestamp: t, - histogram: h, - fHistogram: fh, - isZeroSample: true, - ct: ct, - }) + + key := l.String() + if h != nil { + a.histograms[key] = seriesHistogram{ + Labels: l.Copy(), + Histogram: *h, + } + } + if ref == 0 { - ref = storage.SeriesRef(len(a.metrics)) + ref = storage.SeriesRef(len(a.histograms)) } return ref, nil } @@ -200,80 +239,291 @@ func (a *echoAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labe func (a *echoAppender) Commit() error { a.mut.Lock() defer a.mut.Unlock() - metricGroups := make(map[string][]metricData) - for _, m := range a.metrics { - name := m.labels.Get(model.MetricNameLabel) - if name == "" { - name = "{no_metric_name}" - } - metricGroups[name] = append(metricGroups[name], m) + + families := a.buildMetricFamilies() + + var buf bytes.Buffer + var expFormat expfmt.Format + + switch a.format { + case "openmetrics": + expFormat = expfmt.NewFormat(expfmt.TypeOpenMetrics) + case "text", "": + expFormat = expfmt.NewFormat(expfmt.TypeTextPlain) + default: + level.Warn(a.logger).Log("component", a.componentID, "msg", "unknown format, using text", "format", a.format) + expFormat = expfmt.NewFormat(expfmt.TypeTextPlain) } - var output strings.Builder - output.WriteString(fmt.Sprintf("# Prometheus metrics received by %s\n", a.componentID)) - output.WriteString(fmt.Sprintf("# Timestamp: %s\n", time.Now().Format(time.RFC3339))) - for metricName, metrics := range metricGroups { - if len(metrics) == 0 { + + encoder := expfmt.NewEncoder(&buf, expFormat) + + for _, family := range families { + if err := encoder.Encode(family); err != nil { + level.Error(a.logger).Log("component", a.componentID, "error", "failed to encode metric family", "family", family.GetName(), "err", err) continue } - if metrics[0].metadata != nil { - if metrics[0].metadata.Help != "" { - output.WriteString(fmt.Sprintf("# HELP %s %s\n", metricName, metrics[0].metadata.Help)) - } - if metrics[0].metadata.Type != "" { - output.WriteString(fmt.Sprintf("# TYPE %s %s\n", metricName, metrics[0].metadata.Type)) - } - } - for _, m := range metrics { - output.WriteString(formatMetric(m)) - } } - level.Info(a.logger).Log("component", a.componentID, "metrics", output.String()) - a.metrics = a.metrics[:0] + + level.Info(a.logger).Log("component", a.componentID, "metrics", buf.String()) + + a.clearStorage() + return nil } func (a *echoAppender) Rollback() error { a.mut.Lock() defer a.mut.Unlock() - a.metrics = a.metrics[:0] + + a.clearStorage() + return nil } -func formatMetric(m metricData) string { - var output strings.Builder - metricName := m.labels.Get(model.MetricNameLabel) - if metricName == "" { - metricName = "{no_metric_name}" +func (a *echoAppender) SetOptions(opts *storage.AppendOptions) { +} + +func (a *echoAppender) clearStorage() { + for k := range a.samples { + delete(a.samples, k) + } + for k := range a.exemplars { + delete(a.exemplars, k) + } + for k := range a.histograms { + delete(a.histograms, k) + } + for k := range a.metadata { + delete(a.metadata, k) + } +} + +func (a *echoAppender) buildMetricFamilies() []*dto.MetricFamily { + b := builder{ + samples: a.samples, + exemplars: a.exemplars, + metadata: a.metadata, + histograms: a.histograms, + + familyLookup: make(map[string]*dto.MetricFamily), } - output.WriteString(metricName) - labelPairs := make([]string, 0, len(m.labels)-1) - for _, l := range m.labels { - if l.Name != model.MetricNameLabel { - labelPairs = append(labelPairs, fmt.Sprintf("%s=%q", l.Name, l.Value)) + return b.build() +} + +type builder struct { + samples map[string]sample + exemplars map[string]seriesExemplar + metadata map[string]metadata.Metadata + histograms map[string]seriesHistogram + + families []*dto.MetricFamily + familyLookup map[string]*dto.MetricFamily +} + +func (b *builder) build() []*dto.MetricFamily { + b.buildFamiliesFromMetadata() + b.buildMetricsFromSamples() + b.buildHistograms() + b.assignExemplars() + b.sortFamilies() + + return b.families +} + +func (b *builder) buildFamiliesFromMetadata() { + for metricName, md := range b.metadata { + family := &dto.MetricFamily{ + Name: ptr.To(metricName), + Help: ptr.To(md.Help), } + + family.Type = ptr.To(dto.MetricType_UNTYPED) + + b.families = append(b.families, family) + b.familyLookup[metricName] = family } - if len(labelPairs) > 0 { - output.WriteString("{") - output.WriteString(strings.Join(labelPairs, ",")) - output.WriteString("}") +} + +func (b *builder) buildMetricsFromSamples() { + for _, sample := range b.samples { + metricName := sample.Labels.Get(model.MetricNameLabel) + if metricName == "" { + continue + } + + family := b.getOrCreateFamily(metricName) + metric := &dto.Metric{} + + for _, label := range sample.Labels { + if label.Name != model.MetricNameLabel { + metric.Label = append(metric.Label, &dto.LabelPair{ + Name: ptr.To(string(label.Name)), + Value: ptr.To(string(label.Value)), + }) + } + } + + switch family.GetType() { + case dto.MetricType_COUNTER: + metric.Counter = &dto.Counter{Value: ptr.To(sample.Value)} + case dto.MetricType_GAUGE: + metric.Gauge = &dto.Gauge{Value: ptr.To(sample.Value)} + default: + metric.Untyped = &dto.Untyped{Value: ptr.To(sample.Value)} + } + + if sample.PrintTimestamp { + metric.TimestampMs = ptr.To(sample.Timestamp) + } + + family.Metric = append(family.Metric, metric) } - if m.histogram != nil || m.fHistogram != nil { - output.WriteString(" {histogram}") - } else { - output.WriteString(fmt.Sprintf(" %g", m.value)) +} + +func (b *builder) buildHistograms() { + for _, hist := range b.histograms { + metricName := hist.Labels.Get(model.MetricNameLabel) + if metricName == "" { + continue + } + + family := b.getOrCreateFamily(metricName) + family.Type = ptr.To(dto.MetricType_HISTOGRAM) + + metric := &dto.Metric{} + + for _, label := range hist.Labels { + if label.Name != model.MetricNameLabel { + metric.Label = append(metric.Label, &dto.LabelPair{ + Name: ptr.To(string(label.Name)), + Value: ptr.To(string(label.Value)), + }) + } + } + + metric.Histogram = &dto.Histogram{ + SampleCount: ptr.To(hist.Histogram.Count), + SampleSum: ptr.To(hist.Histogram.Sum), + } + + family.Metric = append(family.Metric, metric) } - if m.timestamp > 0 { - output.WriteString(fmt.Sprintf(" %d", m.timestamp)) +} + +func (b *builder) assignExemplars() { + for _, ex := range b.exemplars { + metricName := ex.Labels.Get(model.MetricNameLabel) + if metricName == "" { + continue + } + + family := b.familyLookup[metricName] + if family == nil { + continue + } + + for _, metric := range family.Metric { + if b.labelsMatch(ex.Labels, metric.Label) { + exemplar := &dto.Exemplar{ + Value: ptr.To(ex.Exemplar.Value), + } + + if ex.Exemplar.HasTs { + ts := ex.Exemplar.Ts / 1000 + exemplar.Timestamp = timestamppb.New(time.Unix(ts, (ex.Exemplar.Ts%1000)*1e6)) + } + + for _, label := range ex.Exemplar.Labels { + exemplar.Label = append(exemplar.Label, &dto.LabelPair{ + Name: ptr.To(string(label.Name)), + Value: ptr.To(string(label.Value)), + }) + } + + if metric.Counter != nil { + metric.Counter.Exemplar = exemplar + } else if metric.Histogram != nil { + metric.Histogram.Bucket = append(metric.Histogram.Bucket, &dto.Bucket{ + Exemplar: exemplar, + }) + } + break + } + } } - if m.exemplar != nil { - output.WriteString(fmt.Sprintf(" # {%s} %g", m.exemplar.Labels, m.exemplar.Value)) +} + +func (b *builder) getOrCreateFamily(metricName string) *dto.MetricFamily { + if family, exists := b.familyLookup[metricName]; exists { + return family } - if m.isZeroSample && m.ct > 0 { - output.WriteString(fmt.Sprintf(" # CreatedTimestamp: %d", m.ct)) + + family := &dto.MetricFamily{ + Name: ptr.To(metricName), + Type: ptr.To(dto.MetricType_UNTYPED), } - output.WriteString("\n") - return output.String() + + b.families = append(b.families, family) + b.familyLookup[metricName] = family + return family } -func (a *echoAppender) SetOptions(opts *storage.AppendOptions) { +func (b *builder) labelsMatch(seriesLabels labels.Labels, metricLabels []*dto.LabelPair) bool { + if len(seriesLabels) != len(metricLabels)+1 { + return false + } + + for _, seriesLabel := range seriesLabels { + if seriesLabel.Name == model.MetricNameLabel { + continue + } + + found := false + for _, metricLabel := range metricLabels { + if string(seriesLabel.Name) == metricLabel.GetName() && string(seriesLabel.Value) == metricLabel.GetValue() { + found = true + break + } + } + if !found { + return false + } + } + + return true +} + +func (b *builder) sortFamilies() { + sort.Slice(b.families, func(i, j int) bool { + return b.families[i].GetName() < b.families[j].GetName() + }) + + for _, family := range b.families { + sort.Slice(family.Metric, func(i, j int) bool { + return b.compareMetrics(family.Metric[i], family.Metric[j]) + }) + } +} + +func (b *builder) compareMetrics(a, bb *dto.Metric) bool { + aLabels := make([]string, 0, len(a.Label)) + bLabels := make([]string, 0, len(bb.Label)) + + for _, label := range a.Label { + aLabels = append(aLabels, label.GetName()+"="+label.GetValue()) + } + for _, label := range bb.Label { + bLabels = append(bLabels, label.GetName()+"="+label.GetValue()) + } + + sort.Strings(aLabels) + sort.Strings(bLabels) + + for i := 0; i < len(aLabels) && i < len(bLabels); i++ { + if aLabels[i] != bLabels[i] { + return aLabels[i] < bLabels[i] + } + } + + return len(aLabels) < len(bLabels) } diff --git a/internal/component/prometheus/echo/echo_test.go b/internal/component/prometheus/echo/echo_test.go index 42998d6e408..2e93f567e83 100644 --- a/internal/component/prometheus/echo/echo_test.go +++ b/internal/component/prometheus/echo/echo_test.go @@ -141,3 +141,85 @@ func TestComponent_Update(t *testing.T) { err = comp.Update(Arguments{}) require.NoError(t, err) } + +func TestAppender_WithExpfmtEncoding(t *testing.T) { + var loggedOutput string + + logger := log.LoggerFunc(func(keyvals ...interface{}) error { + for i := 0; i < len(keyvals); i += 2 { + if keyvals[i] == "metrics" && i+1 < len(keyvals) { + loggedOutput = keyvals[i+1].(string) + } + } + return nil + }) + + comp, err := New(component.Options{ + ID: "test", + Logger: logger, + }, Arguments{}) + require.NoError(t, err) + + ctx := context.Background() + appender := comp.Appender(ctx) + + lbls := labels.FromStrings("__name__", "test_metric", "job", "test_job", "instance", "localhost:8080") + _, err = appender.Append(0, lbls, time.Now().Unix(), 42.0) + require.NoError(t, err) + + err = appender.Commit() + require.NoError(t, err) + + require.NotEmpty(t, loggedOutput) + require.Contains(t, loggedOutput, "test_metric") + require.Contains(t, loggedOutput, "job=\"test_job\"") + require.Contains(t, loggedOutput, "instance=\"localhost:8080\"") + require.Contains(t, loggedOutput, "42") + + require.NotContains(t, loggedOutput, "Prometheus metrics received by") + require.NotContains(t, loggedOutput, "Timestamp:") +} + +func TestAppender_WithOpenMetricsFormat(t *testing.T) { + var loggedOutput string + + logger := log.LoggerFunc(func(keyvals ...interface{}) error { + for i := 0; i < len(keyvals); i += 2 { + if keyvals[i] == "metrics" && i+1 < len(keyvals) { + loggedOutput = keyvals[i+1].(string) + } + } + return nil + }) + + args := Arguments{Format: "openmetrics"} + + comp, err := New(component.Options{ + ID: "test", + Logger: logger, + }, args) + require.NoError(t, err) + + ctx := context.Background() + appender := comp.Appender(ctx) + + lbls := labels.FromStrings("__name__", "test_metric", "job", "test_job") + _, err = appender.Append(0, lbls, time.Now().Unix(), 42.0) + require.NoError(t, err) + + err = appender.Commit() + require.NoError(t, err) + + require.NotEmpty(t, loggedOutput) + require.Contains(t, loggedOutput, "test_metric") + require.Contains(t, loggedOutput, "job=\"test_job\"") + + t.Logf("OpenMetrics output: %s", loggedOutput) +} + +func TestArguments_Defaults(t *testing.T) { + args := Arguments{} + args.SetToDefault() + + require.Equal(t, "text", args.Format) +} From df366c9c1b99bb2229f6b1b63f38e03c4508e4e2 Mon Sep 17 00:00:00 2001 From: iamrajiv Date: Wed, 8 Oct 2025 00:54:29 +0530 Subject: [PATCH 05/24] add doc --- .../components/prometheus/prometheus.echo.md | 124 ++++++++++++++++++ 1 file changed, 124 insertions(+) create mode 100644 docs/sources/reference/components/prometheus/prometheus.echo.md diff --git a/docs/sources/reference/components/prometheus/prometheus.echo.md b/docs/sources/reference/components/prometheus/prometheus.echo.md new file mode 100644 index 00000000000..a65a63d201d --- /dev/null +++ b/docs/sources/reference/components/prometheus/prometheus.echo.md @@ -0,0 +1,124 @@ +--- +canonical: https://grafana.com/docs/alloy/latest/reference/components/prometheus/prometheus.echo/ +aliases: + - ../prometheus.echo/ # /docs/alloy/latest/reference/components/prometheus.echo/ +description: Learn about prometheus.echo +labels: + stage: general-availability + products: + - oss +title: prometheus.echo +--- + +# `prometheus.echo` + +The `prometheus.echo` component receives Prometheus metrics and writes them to stdout in Prometheus exposition format. +This component is useful for debugging and testing the flow of metrics through a pipeline, allowing you to see exactly what metrics are being received at a particular point in your configuration. + +## Usage + +```alloy +prometheus.echo "