Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Improve performance of concurrent histogram measurements in `go.opentelemetry.io/otel/sdk/metric`. (#7474)
- Add experimental observability metrics in `go.opentelemetry.io/otel/exporters/stdout/stdoutmetric`. (#7492)
- Improve the concurrent performance of `HistogramReservoir` in `go.opentelemetry.io/otel/sdk/metric/exemplar` by 4x. (#7443)
- Improve performance of concurrent synchronous gauge measurements in `go.opentelemetry.io/otel/sdk/metric`. (#7478)

<!-- Released section -->
<!-- Don't change this section unless doing release -->
Expand Down
7 changes: 4 additions & 3 deletions sdk/metric/internal/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,13 @@ func (b Builder[N]) filter(f fltrMeasure[N]) Measure[N] {

// LastValue returns a last-value aggregate function input and output.
func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) {
lv := newLastValue[N](b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(lv.measure), lv.delta
lv := newDeltaLastValue[N](b.AggregationLimit, b.resFunc())
return b.filter(lv.measure), lv.collect
default:
return b.filter(lv.measure), lv.cumulative
lv := newCumulativeLastValue[N](b.AggregationLimit, b.resFunc())
return b.filter(lv.measure), lv.collect
}
}

Expand Down
232 changes: 133 additions & 99 deletions sdk/metric/internal/aggregate/lastvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,117 +5,179 @@ package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggreg

import (
"context"
"sync"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

// datapoint is timestamped measurement data.
type datapoint[N int64 | float64] struct {
// lastValuePoint is timestamped measurement data.
type lastValuePoint[N int64 | float64] struct {
attrs attribute.Set
value N
value atomicN[N]
res FilteredExemplarReservoir[N]
}

func newLastValue[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *lastValue[N] {
return &lastValue[N]{
// lastValue summarizes a set of measurements as the last one made.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// lastValue summarizes a set of measurements as the last one made.
// lastValueMap summarizes a set of measurements as the last one made.

type lastValueMap[N int64 | float64] struct {
newRes func(attribute.Set) FilteredExemplarReservoir[N]
values limitedSyncMap
}

func (s *lastValueMap[N]) measure(
ctx context.Context,
value N,
fltrAttr attribute.Set,
droppedAttr []attribute.KeyValue,
) {
lv := s.values.LoadOrStoreAttr(fltrAttr, func(attr attribute.Set) any {
return &lastValuePoint[N]{
res: s.newRes(attr),
attrs: attr,
}
}).(*lastValuePoint[N])

lv.value.Store(value)
lv.res.Offer(ctx, value, droppedAttr)
}

func newDeltaLastValue[N int64 | float64](
limit int,
r func(attribute.Set) FilteredExemplarReservoir[N],
) *deltaLastValue[N] {
return &deltaLastValue[N]{
newRes: r,
limit: newLimiter[datapoint[N]](limit),
values: make(map[attribute.Distinct]*datapoint[N]),
start: now(),
hotColdValMap: [2]lastValueMap[N]{
{
values: limitedSyncMap{aggLimit: limit},
newRes: r,
},
{
values: limitedSyncMap{aggLimit: limit},
newRes: r,
},
},
}
}

// lastValue summarizes a set of measurements as the last one made.
type lastValue[N int64 | float64] struct {
sync.Mutex

// deltaLastValue summarizes a set of measurements as the last one made.
type deltaLastValue[N int64 | float64] struct {
newRes func(attribute.Set) FilteredExemplarReservoir[N]
limit limiter[datapoint[N]]
values map[attribute.Distinct]*datapoint[N]
start time.Time
}

func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) {
s.Lock()
defer s.Unlock()

d, ok := s.values[fltrAttr.Equivalent()]
if !ok {
fltrAttr = s.limit.Attributes(fltrAttr, s.values)
d = &datapoint[N]{
res: s.newRes(fltrAttr),
attrs: fltrAttr,
}
}

d.value = value
d.res.Offer(ctx, value, droppedAttr)
hcwg hotColdWaitGroup
hotColdValMap [2]lastValueMap[N]
}

s.values[fltrAttr.Equivalent()] = d
func (s *deltaLastValue[N]) measure(
ctx context.Context,
value N,
fltrAttr attribute.Set,
droppedAttr []attribute.KeyValue,
) {
hotIdx := s.hcwg.start()
defer s.hcwg.done(hotIdx)
s.hotColdValMap[hotIdx].measure(ctx, value, fltrAttr, droppedAttr)
}

func (s *lastValue[N]) delta(
func (s *deltaLastValue[N]) collect(
dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface
) int {
t := now()
n := s.copyAndClearDpts(dest, t)
// Update start time for delta temporality.
s.start = t
return n
}

// copyAndClearDpts copies the lastValuePoints held by s into dest. The number of lastValuePoints
// copied is returned.
func (s *deltaLastValue[N]) copyAndClearDpts(
dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface
t time.Time,
) int {
// Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
// the DataPoints is missed (better luck next time).
// the lastValuePoints is missed (better luck next time).
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// the lastValuePoints is missed (better luck next time).
// the DataPoints is missed (better luck next time).

This is still referring to the dest field.

gData, _ := (*dest).(metricdata.Gauge[N])
// delta always clears values on collection
readIdx := s.hcwg.swapHotAndWait()
// The len will not change while we iterate over values, since we waited
// for all writes to finish to the cold values and len.
n := s.hotColdValMap[readIdx].values.Len()
dPts := reset(gData.DataPoints, n, n)

s.Lock()
defer s.Unlock()

n := s.copyDpts(&gData.DataPoints, t)
var i int
s.hotColdValMap[readIdx].values.Range(func(_, value any) bool {
v := value.(*lastValuePoint[N])
dPts[i].Attributes = v.attrs
dPts[i].StartTime = s.start
dPts[i].Time = t
dPts[i].Value = v.value.Load()
collectExemplars[N](&dPts[i].Exemplars, v.res.Collect)
i++
return true
})
gData.DataPoints = dPts
// Do not report stale values.
clear(s.values)
// Update start time for delta temporality.
s.start = t

s.hotColdValMap[readIdx].values.Clear()
*dest = gData

return n
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return n
return i

If range for some reason does not iterate over n values, this will be incorrect.

}

func (s *lastValue[N]) cumulative(
// cumulativeLastValue summarizes a set of measurements as the last one made.
type cumulativeLastValue[N int64 | float64] struct {
lastValueMap[N]
start time.Time
}

func newCumulativeLastValue[N int64 | float64](
limit int,
r func(attribute.Set) FilteredExemplarReservoir[N],
) *cumulativeLastValue[N] {
return &cumulativeLastValue[N]{
lastValueMap: lastValueMap[N]{
values: limitedSyncMap{aggLimit: limit},
newRes: r,
},
start: now(),
}
}

func (s *cumulativeLastValue[N]) collect(
dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface
) int {
t := now()
// Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
// the DataPoints is missed (better luck next time).
// the lastValuePoints is missed (better luck next time).
gData, _ := (*dest).(metricdata.Gauge[N])

s.Lock()
defer s.Unlock()
// Values are being concurrently written while we iterate, so only use the
// current length for capacity.
dPts := reset(gData.DataPoints, 0, s.values.Len())

n := s.copyDpts(&gData.DataPoints, t)
var i int
s.values.Range(func(_, value any) bool {
v := value.(*lastValuePoint[N])
newPt := metricdata.DataPoint[N]{
Attributes: v.attrs,
StartTime: s.start,
Time: t,
Value: v.value.Load(),
}
collectExemplars[N](&newPt.Exemplars, v.res.Collect)
dPts = append(dPts, newPt)
i++
return true
})
gData.DataPoints = dPts
// TODO (#3006): This will use an unbounded amount of memory if there
// are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not
// overload the system.
*dest = gData

return n
}

// copyDpts copies the datapoints held by s into dest. The number of datapoints
// copied is returned.
func (s *lastValue[N]) copyDpts(dest *[]metricdata.DataPoint[N], t time.Time) int {
n := len(s.values)
*dest = reset(*dest, n, n)

var i int
for _, v := range s.values {
(*dest)[i].Attributes = v.attrs
(*dest)[i].StartTime = s.start
(*dest)[i].Time = t
(*dest)[i].Value = v.value
collectExemplars(&(*dest)[i].Exemplars, v.res.Collect)
i++
}
return n
return i
}

// newPrecomputedLastValue returns an aggregator that summarizes a set of
Expand All @@ -124,51 +186,23 @@ func newPrecomputedLastValue[N int64 | float64](
limit int,
r func(attribute.Set) FilteredExemplarReservoir[N],
) *precomputedLastValue[N] {
return &precomputedLastValue[N]{lastValue: newLastValue[N](limit, r)}
return &precomputedLastValue[N]{deltaLastValue: newDeltaLastValue[N](limit, r)}
}

// precomputedLastValue summarizes a set of observations as the last one made.
type precomputedLastValue[N int64 | float64] struct {
*lastValue[N]
*deltaLastValue[N]
}

func (s *precomputedLastValue[N]) delta(
dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface
) int {
t := now()
// Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
// the DataPoints is missed (better luck next time).
gData, _ := (*dest).(metricdata.Gauge[N])

s.Lock()
defer s.Unlock()

n := s.copyDpts(&gData.DataPoints, t)
// Do not report stale values.
clear(s.values)
// Update start time for delta temporality.
s.start = t

*dest = gData

return n
return s.collect(dest)
}

func (s *precomputedLastValue[N]) cumulative(
dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface
) int {
t := now()
// Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
// the DataPoints is missed (better luck next time).
gData, _ := (*dest).(metricdata.Gauge[N])

s.Lock()
defer s.Unlock()

n := s.copyDpts(&gData.DataPoints, t)
// Do not report stale values.
clear(s.values)
*dest = gData

return n
// Do not reset the start time.
return s.copyAndClearDpts(dest, now())
}