Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch MinMaxSumCount to a mutex lock instead of StateLocker #667

Merged
merged 4 commits into from
Apr 29, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 44 additions & 82 deletions sdk/metric/aggregator/minmaxsumcount/mmsc.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,25 @@ package minmaxsumcount // import "go.opentelemetry.io/otel/sdk/metric/aggregator

import (
"context"
"sync"

"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/metric"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
"go.opentelemetry.io/otel/sdk/internal"
)

type (
// Aggregator aggregates measure events, keeping only the max,
// Aggregator aggregates measure events, keeping only the min, max,
// sum, and count.
Aggregator struct {
// states has to be aligned for 64-bit atomic operations.
states [2]state
lock internal.StateLocker
kind core.NumberKind
lock sync.Mutex
current state
checkpoint state
kind core.NumberKind
}

state struct {
// all fields have to be aligned for 64-bit atomic operations.
count core.Number
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
sum core.Number
min core.Number
Expand All @@ -47,27 +46,18 @@ var _ export.Aggregator = &Aggregator{}
var _ aggregator.MinMaxSumCount = &Aggregator{}

// New returns a new measure aggregator for computing min, max, sum, and
// count. It does not compute quantile information other than Max.
// count. It does not compute quantile information other than Min and Max.
//
// This aggregator uses the StateLocker pattern to guarantee
// the count, sum, min and max are consistent within a checkpoint
// This type uses a mutex for Update() and Checkpoint() concurrency.
func New(desc *metric.Descriptor) *Aggregator {
kind := desc.NumberKind()
return &Aggregator{
kind: kind,
states: [2]state{
{
count: core.NewUint64Number(0),
sum: kind.Zero(),
min: kind.Maximum(),
max: kind.Minimum(),
},
{
count: core.NewUint64Number(0),
sum: kind.Zero(),
min: kind.Maximum(),
max: kind.Minimum(),
},
current: state{
count: core.NewUint64Number(0),
sum: kind.Zero(),
min: kind.Maximum(),
max: kind.Minimum(),
},
}
}
Expand All @@ -76,14 +66,14 @@ func New(desc *metric.Descriptor) *Aggregator {
func (c *Aggregator) Sum() (core.Number, error) {
c.lock.Lock()
defer c.lock.Unlock()
return c.checkpoint().sum, nil
return c.checkpoint.sum, nil
}

// Count returns the number of values in the checkpoint.
func (c *Aggregator) Count() (int64, error) {
c.lock.Lock()
defer c.lock.Unlock()
return c.checkpoint().count.CoerceToInt64(core.Uint64NumberKind), nil
return c.checkpoint.count.CoerceToInt64(core.Uint64NumberKind), nil
}

// Min returns the minimum value in the checkpoint.
Expand All @@ -92,10 +82,10 @@ func (c *Aggregator) Count() (int64, error) {
func (c *Aggregator) Min() (core.Number, error) {
c.lock.Lock()
defer c.lock.Unlock()
if c.checkpoint().count.IsZero(core.Uint64NumberKind) {
if c.checkpoint.count.IsZero(core.Uint64NumberKind) {
return c.kind.Zero(), aggregator.ErrNoData
}
return c.checkpoint().min, nil
return c.checkpoint.min, nil
}

// Max returns the maximum value in the checkpoint.
Expand All @@ -104,64 +94,44 @@ func (c *Aggregator) Min() (core.Number, error) {
func (c *Aggregator) Max() (core.Number, error) {
c.lock.Lock()
defer c.lock.Unlock()
if c.checkpoint().count.IsZero(core.Uint64NumberKind) {
if c.checkpoint.count.IsZero(core.Uint64NumberKind) {
return c.kind.Zero(), aggregator.ErrNoData
}
return c.checkpoint().max, nil
return c.checkpoint.max, nil
}

// Checkpoint saves the current state and resets the current state to
// the empty set.
func (c *Aggregator) Checkpoint(ctx context.Context, desc *metric.Descriptor) {
c.lock.SwapActiveState(c.resetCheckpoint)
}

// checkpoint returns the "cold" state, i.e. state collected prior to the
// most recent Checkpoint() call
func (c *Aggregator) checkpoint() *state {
return &c.states[c.lock.ColdIdx()]
c.lock.Lock()
c.checkpoint, c.current = c.current, c.emptyState()
c.lock.Unlock()
}

func (c *Aggregator) resetCheckpoint() {
checkpoint := c.checkpoint()

checkpoint.count.SetUint64(0)
checkpoint.sum.SetNumber(c.kind.Zero())
checkpoint.min.SetNumber(c.kind.Maximum())
checkpoint.max.SetNumber(c.kind.Minimum())
func (c *Aggregator) emptyState() state {
kind := c.kind
return state{
count: core.NewUint64Number(0),
sum: kind.Zero(),
min: kind.Maximum(),
max: kind.Minimum(),
}
}

// Update adds the recorded measurement to the current data set.
func (c *Aggregator) Update(_ context.Context, number core.Number, desc *metric.Descriptor) error {
kind := desc.NumberKind()

cIdx := c.lock.Start()
defer c.lock.End(cIdx)

current := &c.states[cIdx]
current.count.AddUint64Atomic(1)
current.sum.AddNumberAtomic(kind, number)

for {
cmin := current.min.AsNumberAtomic()

if number.CompareNumber(kind, cmin) >= 0 {
break
}
if current.min.CompareAndSwapNumber(cmin, number) {
break
}
c.lock.Lock()
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
c.current.count.AddInt64(1)
c.current.sum.AddNumber(kind, number)
if number.CompareNumber(kind, c.current.min) < 0 {
c.current.min = number
}
for {
cmax := current.max.AsNumberAtomic()

if number.CompareNumber(kind, cmax) <= 0 {
break
}
if current.max.CompareAndSwapNumber(cmax, number) {
break
}
if number.CompareNumber(kind, c.current.max) > 0 {
c.current.max = number
}
c.lock.Unlock()
return nil
}

Expand All @@ -172,22 +142,14 @@ func (c *Aggregator) Merge(oa export.Aggregator, desc *metric.Descriptor) error
return aggregator.NewInconsistentMergeError(c, oa)
}

// Lock() synchronizes Merge() and Checkpoint() to ensure all operations of
// Merge() are performed on the same state.
c.lock.Lock()
defer c.lock.Unlock()

current := c.checkpoint()
ocheckpoint := o.checkpoint()

current.count.AddNumber(core.Uint64NumberKind, ocheckpoint.count)
current.sum.AddNumber(desc.NumberKind(), ocheckpoint.sum)
c.checkpoint.count.AddNumber(core.Uint64NumberKind, o.checkpoint.count)
jmacd marked this conversation as resolved.
Show resolved Hide resolved
c.checkpoint.sum.AddNumber(desc.NumberKind(), o.checkpoint.sum)

if current.min.CompareNumber(desc.NumberKind(), ocheckpoint.min) > 0 {
current.min.SetNumber(ocheckpoint.min)
if c.checkpoint.min.CompareNumber(desc.NumberKind(), o.checkpoint.min) > 0 {
c.checkpoint.min.SetNumber(o.checkpoint.min)
}
if current.max.CompareNumber(desc.NumberKind(), ocheckpoint.max) < 0 {
current.max.SetNumber(ocheckpoint.max)
if c.checkpoint.max.CompareNumber(desc.NumberKind(), o.checkpoint.max) < 0 {
c.checkpoint.max.SetNumber(o.checkpoint.max)
}
return nil
}
Loading