Skip to content

Commit

Permalink
processor: enable histograms
Browse files Browse the repository at this point in the history
  • Loading branch information
sh0rez committed Aug 14, 2024
1 parent 1455f98 commit b89bee5
Showing 1 changed file with 11 additions and 2 deletions.
13 changes: 11 additions & 2 deletions processor/deltatocumulativeprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Processor struct {

sums Pipeline[data.Number]
expo Pipeline[data.ExpHistogram]
hist Pipeline[data.Histogram]

mtx sync.Mutex
}
Expand All @@ -52,6 +53,7 @@ func newProcessor(cfg *Config, log *zap.Logger, telb *metadata.TelemetryBuilder,

sums: pipeline[data.Number](cfg, &tel),
expo: pipeline[data.ExpHistogram](cfg, &tel),
hist: pipeline[data.Histogram](cfg, &tel),
}

return &proc
Expand Down Expand Up @@ -93,7 +95,8 @@ func pipeline[D data.Point[D]](cfg *Config, tel *telemetry.Telemetry) Pipeline[D
func (p *Processor) Start(_ context.Context, _ component.Host) error {
sums, sok := p.sums.stale.Try()
expo, eok := p.expo.stale.Try()
if !(sok && eok) {
hist, hok := p.hist.stale.Try()
if !(sok && eok && hok) {
return nil
}

Expand All @@ -107,6 +110,7 @@ func (p *Processor) Start(_ context.Context, _ component.Host) error {
p.mtx.Lock()
sums.ExpireOldEntries()
expo.ExpireOldEntries()
hist.ExpireOldEntries()
p.mtx.Unlock()
}
}
Expand Down Expand Up @@ -142,7 +146,12 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro
sum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
case pmetric.MetricTypeHistogram:
// TODO
hist := m.Histogram()
if hist.AggregationTemporality() == pmetric.AggregationTemporalityDelta {
err := streams.Apply(metrics.Histogram(m), p.hist.aggr.Aggregate)
errs = errors.Join(errs, err)
hist.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
case pmetric.MetricTypeExponentialHistogram:
expo := m.ExponentialHistogram()
if expo.AggregationTemporality() == pmetric.AggregationTemporalityDelta {
Expand Down

0 comments on commit b89bee5

Please sign in to comment.