Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add WithReader and WithResource Options #2905

Merged
merged 8 commits into from
May 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 109 additions & 2 deletions sdk/metric/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,119 @@

package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"context"
"fmt"
"sync"

"go.opentelemetry.io/otel/sdk/metric/view"
"go.opentelemetry.io/otel/sdk/resource"
)

// config contains configuration options for a MeterProvider.
type config struct{}
type config struct {
res *resource.Resource
readers map[Reader][]view.Config
}

// readerSignals returns a force-flush and shutdown function for a
// MeterProvider to call in their respective options. All Readers c contains
// will have their force-flush and shutdown methods unified into returned
// single functions.
func (c config) readerSignals() (forceFlush, shutdown func(context.Context) error) {
var fFuncs, sFuncs []func(context.Context) error
for r := range c.readers {
sFuncs = append(sFuncs, r.Shutdown)
fFuncs = append(fFuncs, r.ForceFlush)
}

return unify(fFuncs), unifyShutdown(sFuncs)
}

// unify unifies calling all of funcs into a single function call. All errors
// returned from calls to funcs will be unify into a single error return
// value.
func unify(funcs []func(context.Context) error) func(context.Context) error {
return func(ctx context.Context) error {
var errs []error
for _, f := range funcs {
if err := f(ctx); err != nil {
errs = append(errs, err)
}
}
switch len(errs) {
case 0:
return nil
case 1:
return errs[0]
default:
return fmt.Errorf("%v", errs)
}
}
}

// unifyShutdown unifies calling all of funcs once for a shutdown. If called
// more than once, an ErrReaderShutdown error is returned.
func unifyShutdown(funcs []func(context.Context) error) func(context.Context) error {
f := unify(funcs)
var once sync.Once
return func(ctx context.Context) error {
err := ErrReaderShutdown
once.Do(func() { err = f(ctx) })
return err
}
}

// newConfig returns a config configured with options.
func newConfig(options []Option) config {
conf := config{res: resource.Default()}
for _, o := range options {
conf = o.apply(conf)
}
return conf
}

// Option applies a configuration option value to a MeterProvider.
type Option interface {
apply(config) config
}

// TODO (#2819): implement provider options.
// optionFunc applies a set of options to a config.
type optionFunc func(config) config

// apply returns a config with option(s) applied.
func (o optionFunc) apply(conf config) config {
return o(conf)
}

// WithResource associates a Resource with a MeterProvider. This Resource
// represents the entity producing telemetry and is associated with all Meters
// the MeterProvider will create.
//
// By default, if this Option is not used, the default Resource from the
// go.opentelemetry.io/otel/sdk/resource package will be used.
func WithResource(res *resource.Resource) Option {
return optionFunc(func(conf config) config {
conf.res = res
return conf
})
}

// WithReader associates a Reader with a MeterProvider. Any passed view config
// will be used to associate a view with the Reader. If no configs are passed
// the default view will be use for the Reader.
//
// Passing this option multiple times for the same Reader will overwrite. The
// last option passed will be the one used for that Reader.
//
// By default, if this option is not used, the MeterProvider will perform no
// operations; no data will be exported without a Reader.
func WithReader(r Reader, confs ...view.Config) Option {
return optionFunc(func(cfg config) config {
if cfg.readers == nil {
cfg.readers = make(map[Reader][]view.Config)
}
cfg.readers[r] = confs
return cfg
})
}
121 changes: 121 additions & 0 deletions sdk/metric/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.17
// +build go1.17

package metric

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel/sdk/metric/export"
"go.opentelemetry.io/otel/sdk/resource"
)

type reader struct {
producer producer
collectFunc func(context.Context) (export.Metrics, error)
forceFlushFunc func(context.Context) error
shutdownFunc func(context.Context) error
}

var _ Reader = (*reader)(nil)

func (r *reader) register(p producer) { r.producer = p }
func (r *reader) Collect(ctx context.Context) (export.Metrics, error) { return r.collectFunc(ctx) }
func (r *reader) ForceFlush(ctx context.Context) error { return r.forceFlushFunc(ctx) }
func (r *reader) Shutdown(ctx context.Context) error { return r.shutdownFunc(ctx) }

func TestConfigReaderSignalsEmpty(t *testing.T) {
f, s := config{}.readerSignals()

require.NotNil(t, f)
require.NotNil(t, s)

ctx := context.Background()
assert.Nil(t, f(ctx))
assert.Nil(t, s(ctx))
assert.ErrorIs(t, s(ctx), ErrReaderShutdown)
}

func TestConfigReaderSignalsForwarded(t *testing.T) {
var flush, sdown int
r := &reader{
forceFlushFunc: func(ctx context.Context) error {
flush++
return nil
},
shutdownFunc: func(ctx context.Context) error {
sdown++
return nil
},
}
c := newConfig([]Option{WithReader(r)})
f, s := c.readerSignals()

require.NotNil(t, f)
require.NotNil(t, s)

ctx := context.Background()
assert.NoError(t, f(ctx))
assert.NoError(t, f(ctx))
assert.NoError(t, s(ctx))
assert.ErrorIs(t, s(ctx), ErrReaderShutdown)

assert.Equal(t, 2, flush, "flush not called 2 times")
assert.Equal(t, 1, sdown, "shutdown not called 1 time")
}

func TestConfigReaderSignalsForwardedErrors(t *testing.T) {
r := &reader{
forceFlushFunc: func(ctx context.Context) error { return assert.AnError },
shutdownFunc: func(ctx context.Context) error { return assert.AnError },
}
c := newConfig([]Option{WithReader(r)})
f, s := c.readerSignals()

require.NotNil(t, f)
require.NotNil(t, s)

ctx := context.Background()
assert.ErrorIs(t, f(ctx), assert.AnError)
assert.ErrorIs(t, s(ctx), assert.AnError)
assert.ErrorIs(t, s(ctx), ErrReaderShutdown)
}

func TestUnifyMultiError(t *testing.T) {
f := func(context.Context) error { return assert.AnError }
funcs := []func(context.Context) error{f, f, f}
errs := []error{assert.AnError, assert.AnError, assert.AnError}
target := fmt.Errorf("%v", errs)
assert.Equal(t, unify(funcs)(context.Background()), target)
}

func TestWithResource(t *testing.T) {
res := resource.NewSchemaless()
c := newConfig([]Option{WithResource(res)})
assert.Same(t, res, c.res)
}

func TestWithReader(t *testing.T) {
r := &reader{}
c := newConfig([]Option{WithReader(r)})
assert.Contains(t, c.readers, r)
}
3 changes: 3 additions & 0 deletions sdk/metric/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@ require (
github.com/stretchr/testify v1.7.1
go.opentelemetry.io/otel v1.7.0
go.opentelemetry.io/otel/metric v0.0.0-00010101000000-000000000000
go.opentelemetry.io/otel/sdk v1.7.0
)

replace go.opentelemetry.io/otel => ../..

replace go.opentelemetry.io/otel/metric => ../../metric

replace go.opentelemetry.io/otel/trace => ../../trace

replace go.opentelemetry.io/otel/sdk => ../
2 changes: 2 additions & 0 deletions sdk/metric/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7 h1:iGu644GcxtEcrInvDsQRCwJjtCIOlT2V7IRt6ah2Whw=
golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
Expand Down
29 changes: 21 additions & 8 deletions sdk/metric/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,17 @@ import (
"context"

"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/resource"
)

// MeterProvider handles the creation and coordination of Meters. All Meters
// created by a MeterProvider will be associated with the same Resource, have
// the same Views applied to them, and have their produced metric telemetry
// passed to the configured Readers.
type MeterProvider struct {
// TODO (#2820): implement.
res *resource.Resource

forceFlush, shutdown func(context.Context) error
}

// Compile-time check MeterProvider implements metric.MeterProvider.
Expand All @@ -41,8 +44,15 @@ var _ metric.MeterProvider = (*MeterProvider)(nil)
// created. This means the returned MeterProvider, one created with no
// Readers, will be perform no operations.
func NewMeterProvider(options ...Option) *MeterProvider {
// TODO (#2820): implement.
return &MeterProvider{}
conf := newConfig(options)

flush, sdown := conf.readerSignals()

return &MeterProvider{
res: conf.res,
forceFlush: flush,
shutdown: sdown,
}
}

// Meter returns a Meter with the given name and configured with options.
Expand Down Expand Up @@ -74,16 +84,18 @@ func (mp *MeterProvider) Meter(name string, options ...metric.MeterOption) metri
//
// This method is safe to call concurrently.
func (mp *MeterProvider) ForceFlush(ctx context.Context) error {
// TODO (#2820): implement.
// TODO: test this is concurrent safe.
if mp.forceFlush != nil {
return mp.forceFlush(ctx)
}
return nil
}

// Shutdown shuts down the MeterProvider flushing all pending telemetry and
// releasing any held computational resources.
//
// This call is idempotent. The first call will perform all flush and
// releasing operations. Subsequent calls will perform no action.
// releasing operations. Subsequent calls will perform no action and will
// return an error stating this.
//
// Measurements made by instruments from meters this MeterProvider created
// will not be exported after Shutdown is called.
Expand All @@ -95,7 +107,8 @@ func (mp *MeterProvider) ForceFlush(ctx context.Context) error {
//
// This method is safe to call concurrently.
func (mp *MeterProvider) Shutdown(ctx context.Context) error {
// TODO (#2820): implement.
// TODO: test this is concurrent safe.
if mp.shutdown != nil {
return mp.shutdown(ctx)
}
return nil
}
55 changes: 55 additions & 0 deletions sdk/metric/provider_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.17
// +build go1.17

package metric

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
)

func TestForceFlushConcurrentSafe(t *testing.T) {
mp := NewMeterProvider()

go func() {
_ = mp.ForceFlush(context.Background())
}()

_ = mp.ForceFlush(context.Background())
}

func TestShutdownConcurrentSafe(t *testing.T) {
mp := NewMeterProvider()

go func() {
_ = mp.Shutdown(context.Background())
}()

_ = mp.Shutdown(context.Background())
}

func TestForceFlushDoesNotPanicForEmptyMeterProvider(t *testing.T) {
mp := MeterProvider{}
assert.NotPanics(t, func() { _ = mp.ForceFlush(context.Background()) })
}

func TestShutdownDoesNotPanicForEmptyMeterProvider(t *testing.T) {
mp := MeterProvider{}
assert.NotPanics(t, func() { _ = mp.Shutdown(context.Background()) })
}