Skip to content

Commit

Permalink
Reimplement histogram using mutex instead of stateLocker (open-teleme…
Browse files Browse the repository at this point in the history
…try#669)

* Reimplement histogram using mutex instead of stateLocker

Move existing implementation to histogram_statelocker.go. Implement
benchmarks for single thread and parallel histogram updates comparing
mutex version to stateLocker version

* Drop statelocker implementation and alignment tests, benchmarks

Co-authored-by: Joshua MacDonald <[email protected]>
  • Loading branch information
evantorrie and jmacd authored Apr 29, 2020
1 parent bd16ce0 commit e4ec924
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 107 deletions.
98 changes: 34 additions & 64 deletions sdk/metric/aggregator/histogram/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,21 @@ package histogram // import "go.opentelemetry.io/otel/sdk/metric/aggregator/hist
import (
"context"
"sort"
"sync"

"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/metric"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
"go.opentelemetry.io/otel/sdk/internal"
)

type (
// Aggregator observe events and counts them in pre-determined buckets.
// It also calculates the sum and count of all events.
Aggregator struct {
// This aggregator uses the StateLocker that enables a lock-free Update()
// in exchange of a blocking and consistent Checkpoint(). Since Checkpoint()
// is called by the sdk itself and it is not part of a hot path,
// the user is not impacted by these blocking calls.
//
// The algorithm keeps two states. At every instance of time there exist one current state,
// in which new updates are aggregated, and one checkpoint state, that represents the state
// since the last Checkpoint(). These states are swapped when a `Checkpoint()` occur.

// states needs to be aligned for 64-bit atomic operations.
states [2]state
lock internal.StateLocker
lock sync.Mutex
current state
checkpoint state
boundaries []core.Number
kind core.NumberKind
}
Expand Down Expand Up @@ -84,18 +75,10 @@ func New(desc *metric.Descriptor, boundaries []core.Number) *Aggregator {
agg := Aggregator{
kind: desc.NumberKind(),
boundaries: boundaries,
states: [2]state{
{
buckets: aggregator.Buckets{
Boundaries: boundaries,
Counts: make([]core.Number, len(boundaries)+1),
},
},
{
buckets: aggregator.Buckets{
Boundaries: boundaries,
Counts: make([]core.Number, len(boundaries)+1),
},
current: state{
buckets: aggregator.Buckets{
Boundaries: boundaries,
Counts: make([]core.Number, len(boundaries)+1),
},
},
}
Expand All @@ -106,64 +89,60 @@ func New(desc *metric.Descriptor, boundaries []core.Number) *Aggregator {
func (c *Aggregator) Sum() (core.Number, error) {
c.lock.Lock()
defer c.lock.Unlock()
return c.checkpoint().sum, nil
return c.checkpoint.sum, nil
}

// Count returns the number of values in the checkpoint.
func (c *Aggregator) Count() (int64, error) {
c.lock.Lock()
defer c.lock.Unlock()
return int64(c.checkpoint().count), nil
return int64(c.checkpoint.count), nil
}

// Histogram returns the count of events in pre-determined buckets.
func (c *Aggregator) Histogram() (aggregator.Buckets, error) {
c.lock.Lock()
defer c.lock.Unlock()
return c.checkpoint().buckets, nil
return c.checkpoint.buckets, nil
}

// Checkpoint saves the current state and resets the current state to
// the empty set. Since no locks are taken, there is a chance that
// the independent Sum, Count and Bucket Count are not consistent with each
// other.
func (c *Aggregator) Checkpoint(ctx context.Context, desc *metric.Descriptor) {
c.lock.SwapActiveState(c.resetCheckpoint)
}

// checkpoint returns the checkpoint state by inverting the lower bit of generationAndHotIdx.
func (c *Aggregator) checkpoint() *state {
return &c.states[c.lock.ColdIdx()]
c.lock.Lock()
c.checkpoint, c.current = c.current, c.emptyState()
c.lock.Unlock()
}

func (c *Aggregator) resetCheckpoint() {
checkpoint := c.checkpoint()

checkpoint.count.SetUint64(0)
checkpoint.sum.SetNumber(core.Number(0))
checkpoint.buckets.Counts = make([]core.Number, len(checkpoint.buckets.Counts))
func (c *Aggregator) emptyState() state {
return state{
buckets: aggregator.Buckets{
Boundaries: c.boundaries,
Counts: make([]core.Number, len(c.boundaries)+1),
},
}
}

// Update adds the recorded measurement to the current data set.
func (c *Aggregator) Update(_ context.Context, number core.Number, desc *metric.Descriptor) error {
kind := desc.NumberKind()

cIdx := c.lock.Start()
defer c.lock.End(cIdx)

current := &c.states[cIdx]
current.count.AddUint64Atomic(1)
current.sum.AddNumberAtomic(kind, number)

bucketID := len(c.boundaries)
for i, boundary := range c.boundaries {
if number.CompareNumber(kind, boundary) < 0 {
current.buckets.Counts[i].AddUint64Atomic(1)
return nil
bucketID = i
break
}
}

// Observed event is bigger than all defined boundaries.
current.buckets.Counts[len(c.boundaries)].AddUint64Atomic(1)
c.lock.Lock()
defer c.lock.Unlock()

c.current.count.AddInt64(1)
c.current.sum.AddNumber(kind, number)
c.current.buckets.Counts[bucketID].AddUint64(1)

return nil
}
Expand All @@ -175,20 +154,11 @@ func (c *Aggregator) Merge(oa export.Aggregator, desc *metric.Descriptor) error
return aggregator.NewInconsistentMergeError(c, oa)
}

// Lock() synchronize Merge() and Checkpoint() to make sure all operations of
// Merge() is done to the same state.
c.lock.Lock()
defer c.lock.Unlock()

current := c.checkpoint()
// We assume that the aggregator being merged is not being updated nor checkpointed or this could be inconsistent.
ocheckpoint := o.checkpoint()

current.sum.AddNumber(desc.NumberKind(), ocheckpoint.sum)
current.count.AddNumber(core.Uint64NumberKind, ocheckpoint.count)
c.checkpoint.sum.AddNumber(desc.NumberKind(), o.checkpoint.sum)
c.checkpoint.count.AddNumber(core.Uint64NumberKind, o.checkpoint.count)

for i := 0; i < len(current.buckets.Counts); i++ {
current.buckets.Counts[i].AddNumber(core.Uint64NumberKind, ocheckpoint.buckets.Counts[i])
for i := 0; i < len(c.checkpoint.buckets.Counts); i++ {
c.checkpoint.buckets.Counts[i].AddNumber(core.Uint64NumberKind, o.checkpoint.buckets.Counts[i])
}
return nil
}
Expand Down
47 changes: 8 additions & 39 deletions sdk/metric/aggregator/histogram/histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,13 @@ import (
"context"
"math"
"math/rand"
"os"
"sort"
"testing"
"unsafe"

"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/metric"
ottest "go.opentelemetry.io/otel/internal/testing"
"go.opentelemetry.io/otel/sdk/metric/aggregator/test"
)

Expand Down Expand Up @@ -67,34 +64,6 @@ var (
}
)

// Ensure struct alignment prior to running tests.
func TestMain(m *testing.M) {
fields := []ottest.FieldOffset{
{
Name: "Aggregator.states",
Offset: unsafe.Offsetof(Aggregator{}.states),
},
{
Name: "state.buckets",
Offset: unsafe.Offsetof(state{}.buckets),
},
{
Name: "state.sum",
Offset: unsafe.Offsetof(state{}.sum),
},
{
Name: "state.count",
Offset: unsafe.Offsetof(state{}.count),
},
}

if !ottest.Aligned8Byte(fields, os.Stderr) {
os.Exit(1)
}

os.Exit(m.Run())
}

func TestHistogramAbsolute(t *testing.T) {
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
histogram(t, profile, positiveOnly)
Expand Down Expand Up @@ -145,12 +114,12 @@ func histogram(t *testing.T, profile test.Profile, policy policy) {
require.Equal(t, all.Count(), count, "Same count -"+policy.name)
require.Nil(t, err)

require.Equal(t, len(agg.checkpoint().buckets.Counts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries")
require.Equal(t, len(agg.checkpoint.buckets.Counts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries")

counts := calcBuckets(all.Points(), profile)
for i, v := range counts {
bCount := agg.checkpoint().buckets.Counts[i].AsUint64()
require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, agg.checkpoint().buckets.Counts)
bCount := agg.checkpoint.buckets.Counts[i].AsUint64()
require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, agg.checkpoint.buckets.Counts)
}
}

Expand Down Expand Up @@ -196,12 +165,12 @@ func TestHistogramMerge(t *testing.T) {
require.Equal(t, all.Count(), count, "Same count - absolute")
require.Nil(t, err)

require.Equal(t, len(agg1.checkpoint().buckets.Counts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries")
require.Equal(t, len(agg1.checkpoint.buckets.Counts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries")

counts := calcBuckets(all.Points(), profile)
for i, v := range counts {
bCount := agg1.checkpoint().buckets.Counts[i].AsUint64()
require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, agg1.checkpoint().buckets.Counts)
bCount := agg1.checkpoint.buckets.Counts[i].AsUint64()
require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, agg1.checkpoint.buckets.Counts)
}
})
}
Expand All @@ -223,8 +192,8 @@ func TestHistogramNotSet(t *testing.T) {
require.Equal(t, int64(0), count, "Empty checkpoint count = 0")
require.Nil(t, err)

require.Equal(t, len(agg.checkpoint().buckets.Counts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries")
for i, bCount := range agg.checkpoint().buckets.Counts {
require.Equal(t, len(agg.checkpoint.buckets.Counts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries")
for i, bCount := range agg.checkpoint.buckets.Counts {
require.Equal(t, uint64(0), bCount.AsUint64(), "Bucket #%d must have 0 observed values", i)
}
})
Expand Down
4 changes: 0 additions & 4 deletions sdk/metric/histogram_stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// This test is too large for the race detector. This SDK uses no locks
// that the race detector would help with, anyway.
// +build !race

package metric_test

import (
Expand Down

0 comments on commit e4ec924

Please sign in to comment.