Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 34 additions & 19 deletions sdk/metric/internal/aggregate/exponential_histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,14 +181,15 @@ func (p *expoHistogramDataPoint[N]) count() uint64 {
// expoBuckets is a set of buckets in an exponential histogram.
type expoBuckets struct {
startBin int32
counts []uint64
counts []atomic.Uint64
Comment thread
dashpole marked this conversation as resolved.
}

// record increments the count for the given bin, and expands the buckets if needed.
// Size changes must be done before calling this function.
func (b *expoBuckets) record(bin int32) {
if len(b.counts) == 0 {
b.counts = []uint64{1}
b.counts = make([]atomic.Uint64, 1)
b.counts[0].Store(1)
b.startBin = bin
return
}
Expand All @@ -197,7 +198,7 @@ func (b *expoBuckets) record(bin int32) {

// if the new bin is inside the current range
if bin >= b.startBin && int(bin) <= endBin {
b.counts[bin-b.startBin]++
b.counts[bin-b.startBin].Add(1)
return
}
// if the new bin is before the current start add spaces to the counts
Expand All @@ -207,32 +208,38 @@ func (b *expoBuckets) record(bin int32) {
shift := b.startBin - bin

if newLength > cap(b.counts) {
b.counts = append(b.counts, make([]uint64, newLength-len(b.counts))...)
b.counts = append(b.counts, make([]atomic.Uint64, newLength-len(b.counts))...)
}

copy(b.counts[shift:origLen+int(shift)], b.counts)
b.counts = b.counts[:newLength]

// Shift existing elements to the right. Go's copy() doesn't work for
// structs like atomic.Uint64.
for i := origLen - 1; i >= 0; i-- {
b.counts[i+int(shift)].Store(b.counts[i].Load())
}

Comment thread
MrAlias marked this conversation as resolved.
for i := 1; i < int(shift); i++ {
b.counts[i] = 0
b.counts[i].Store(0)
}
b.startBin = bin
b.counts[0] = 1
b.counts[0].Store(1)
return
}
// if the new is after the end add spaces to the end
if int(bin) > endBin {
if int(bin-b.startBin) < cap(b.counts) {
b.counts = b.counts[:bin-b.startBin+1]
for i := endBin + 1 - int(b.startBin); i < len(b.counts); i++ {
b.counts[i] = 0
b.counts[i].Store(0)
}
b.counts[bin-b.startBin] = 1
b.counts[bin-b.startBin].Store(1)
return
}

end := make([]uint64, int(bin-b.startBin)-len(b.counts)+1)
end := make([]atomic.Uint64, int(bin-b.startBin)-len(b.counts)+1)
b.counts = append(b.counts, end...)
b.counts[bin-b.startBin] = 1
b.counts[bin-b.startBin].Store(1)
}
}

Expand All @@ -259,10 +266,10 @@ func (b *expoBuckets) downscale(delta int32) {
for i := 1; i < len(b.counts); i++ {
idx := i + int(offset)
if idx%int(steps) == 0 {
b.counts[idx/int(steps)] = b.counts[i]
b.counts[idx/int(steps)].Store(b.counts[i].Load())
continue
}
b.counts[idx/int(steps)] += b.counts[i]
b.counts[idx/int(steps)].Add(b.counts[i].Load())
}

lastIdx := (len(b.counts) - 1 + int(offset)) / int(steps)
Expand All @@ -272,8 +279,8 @@ func (b *expoBuckets) downscale(delta int32) {

func (b *expoBuckets) count() uint64 {
var total uint64
for _, count := range b.counts {
total += count
for i := range b.counts {
total += b.counts[i].Load()
}
return total
}
Expand Down Expand Up @@ -380,15 +387,19 @@ func (e *expoHistogram[N]) delta(
len(val.posBuckets.counts),
len(val.posBuckets.counts),
)
copy(hDPts[i].PositiveBucket.Counts, val.posBuckets.counts)
for j := range val.posBuckets.counts {
hDPts[i].PositiveBucket.Counts[j] = val.posBuckets.counts[j].Load()
}

hDPts[i].NegativeBucket.Offset = val.negBuckets.startBin
hDPts[i].NegativeBucket.Counts = reset(
hDPts[i].NegativeBucket.Counts,
len(val.negBuckets.counts),
len(val.negBuckets.counts),
)
copy(hDPts[i].NegativeBucket.Counts, val.negBuckets.counts)
for j := range val.negBuckets.counts {
hDPts[i].NegativeBucket.Counts[j] = val.negBuckets.counts[j].Load()
}

if !e.noSum {
hDPts[i].Sum = val.sum.load()
Expand Down Expand Up @@ -445,15 +456,19 @@ func (e *expoHistogram[N]) cumulative(
len(val.posBuckets.counts),
len(val.posBuckets.counts),
)
copy(hDPts[i].PositiveBucket.Counts, val.posBuckets.counts)
for j := range val.posBuckets.counts {
hDPts[i].PositiveBucket.Counts[j] = val.posBuckets.counts[j].Load()
}

hDPts[i].NegativeBucket.Offset = val.negBuckets.startBin
hDPts[i].NegativeBucket.Counts = reset(
hDPts[i].NegativeBucket.Counts,
len(val.negBuckets.counts),
len(val.negBuckets.counts),
)
copy(hDPts[i].NegativeBucket.Counts, val.negBuckets.counts)
for j := range val.negBuckets.counts {
hDPts[i].NegativeBucket.Counts[j] = val.negBuckets.counts[j].Load()
}

if !e.noSum {
hDPts[i].Sum = val.sum.load()
Expand Down
Loading
Loading