diff --git a/sdk/metric/config.go b/sdk/metric/config.go index cb0be7590ae..66f73ee325c 100644 --- a/sdk/metric/config.go +++ b/sdk/metric/config.go @@ -17,12 +17,119 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" +import ( + "context" + "fmt" + "sync" + + "go.opentelemetry.io/otel/sdk/metric/view" + "go.opentelemetry.io/otel/sdk/resource" +) + // config contains configuration options for a MeterProvider. -type config struct{} +type config struct { + res *resource.Resource + readers map[Reader][]view.Config +} + +// readerSignals returns a force-flush and shutdown function for a +// MeterProvider to call in their respective options. All Readers c contains +// will have their force-flush and shutdown methods unified into returned +// single functions. +func (c config) readerSignals() (forceFlush, shutdown func(context.Context) error) { + var fFuncs, sFuncs []func(context.Context) error + for r := range c.readers { + sFuncs = append(sFuncs, r.Shutdown) + fFuncs = append(fFuncs, r.ForceFlush) + } + + return unify(fFuncs), unifyShutdown(sFuncs) +} + +// unify unifies calling all of funcs into a single function call. All errors +// returned from calls to funcs will be unify into a single error return +// value. +func unify(funcs []func(context.Context) error) func(context.Context) error { + return func(ctx context.Context) error { + var errs []error + for _, f := range funcs { + if err := f(ctx); err != nil { + errs = append(errs, err) + } + } + switch len(errs) { + case 0: + return nil + case 1: + return errs[0] + default: + return fmt.Errorf("%v", errs) + } + } +} + +// unifyShutdown unifies calling all of funcs once for a shutdown. If called +// more than once, an ErrReaderShutdown error is returned. +func unifyShutdown(funcs []func(context.Context) error) func(context.Context) error { + f := unify(funcs) + var once sync.Once + return func(ctx context.Context) error { + err := ErrReaderShutdown + once.Do(func() { err = f(ctx) }) + return err + } +} + +// newConfig returns a config configured with options. +func newConfig(options []Option) config { + conf := config{res: resource.Default()} + for _, o := range options { + conf = o.apply(conf) + } + return conf +} // Option applies a configuration option value to a MeterProvider. type Option interface { apply(config) config } -// TODO (#2819): implement provider options. +// optionFunc applies a set of options to a config. +type optionFunc func(config) config + +// apply returns a config with option(s) applied. +func (o optionFunc) apply(conf config) config { + return o(conf) +} + +// WithResource associates a Resource with a MeterProvider. This Resource +// represents the entity producing telemetry and is associated with all Meters +// the MeterProvider will create. +// +// By default, if this Option is not used, the default Resource from the +// go.opentelemetry.io/otel/sdk/resource package will be used. +func WithResource(res *resource.Resource) Option { + return optionFunc(func(conf config) config { + conf.res = res + return conf + }) +} + +// WithReader associates a Reader with a MeterProvider. Any passed view config +// will be used to associate a view with the Reader. If no configs are passed +// the default view will be use for the Reader. +// +// Passing this option multiple times for the same Reader will overwrite. The +// last option passed will be the one used for that Reader. +// +// By default, if this option is not used, the MeterProvider will perform no +// operations; no data will be exported without a Reader. +func WithReader(r Reader, confs ...view.Config) Option { + return optionFunc(func(cfg config) config { + if cfg.readers == nil { + cfg.readers = make(map[Reader][]view.Config) + } + cfg.readers[r] = confs + return cfg + }) +} diff --git a/sdk/metric/config_test.go b/sdk/metric/config_test.go new file mode 100644 index 00000000000..22917ba4fa0 --- /dev/null +++ b/sdk/metric/config_test.go @@ -0,0 +1,121 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.17 +// +build go1.17 + +package metric + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel/sdk/metric/export" + "go.opentelemetry.io/otel/sdk/resource" +) + +type reader struct { + producer producer + collectFunc func(context.Context) (export.Metrics, error) + forceFlushFunc func(context.Context) error + shutdownFunc func(context.Context) error +} + +var _ Reader = (*reader)(nil) + +func (r *reader) register(p producer) { r.producer = p } +func (r *reader) Collect(ctx context.Context) (export.Metrics, error) { return r.collectFunc(ctx) } +func (r *reader) ForceFlush(ctx context.Context) error { return r.forceFlushFunc(ctx) } +func (r *reader) Shutdown(ctx context.Context) error { return r.shutdownFunc(ctx) } + +func TestConfigReaderSignalsEmpty(t *testing.T) { + f, s := config{}.readerSignals() + + require.NotNil(t, f) + require.NotNil(t, s) + + ctx := context.Background() + assert.Nil(t, f(ctx)) + assert.Nil(t, s(ctx)) + assert.ErrorIs(t, s(ctx), ErrReaderShutdown) +} + +func TestConfigReaderSignalsForwarded(t *testing.T) { + var flush, sdown int + r := &reader{ + forceFlushFunc: func(ctx context.Context) error { + flush++ + return nil + }, + shutdownFunc: func(ctx context.Context) error { + sdown++ + return nil + }, + } + c := newConfig([]Option{WithReader(r)}) + f, s := c.readerSignals() + + require.NotNil(t, f) + require.NotNil(t, s) + + ctx := context.Background() + assert.NoError(t, f(ctx)) + assert.NoError(t, f(ctx)) + assert.NoError(t, s(ctx)) + assert.ErrorIs(t, s(ctx), ErrReaderShutdown) + + assert.Equal(t, 2, flush, "flush not called 2 times") + assert.Equal(t, 1, sdown, "shutdown not called 1 time") +} + +func TestConfigReaderSignalsForwardedErrors(t *testing.T) { + r := &reader{ + forceFlushFunc: func(ctx context.Context) error { return assert.AnError }, + shutdownFunc: func(ctx context.Context) error { return assert.AnError }, + } + c := newConfig([]Option{WithReader(r)}) + f, s := c.readerSignals() + + require.NotNil(t, f) + require.NotNil(t, s) + + ctx := context.Background() + assert.ErrorIs(t, f(ctx), assert.AnError) + assert.ErrorIs(t, s(ctx), assert.AnError) + assert.ErrorIs(t, s(ctx), ErrReaderShutdown) +} + +func TestUnifyMultiError(t *testing.T) { + f := func(context.Context) error { return assert.AnError } + funcs := []func(context.Context) error{f, f, f} + errs := []error{assert.AnError, assert.AnError, assert.AnError} + target := fmt.Errorf("%v", errs) + assert.Equal(t, unify(funcs)(context.Background()), target) +} + +func TestWithResource(t *testing.T) { + res := resource.NewSchemaless() + c := newConfig([]Option{WithResource(res)}) + assert.Same(t, res, c.res) +} + +func TestWithReader(t *testing.T) { + r := &reader{} + c := newConfig([]Option{WithReader(r)}) + assert.Contains(t, c.readers, r) +} diff --git a/sdk/metric/go.mod b/sdk/metric/go.mod index 27c470ead9f..933ed548e2f 100644 --- a/sdk/metric/go.mod +++ b/sdk/metric/go.mod @@ -6,6 +6,7 @@ require ( github.com/stretchr/testify v1.7.1 go.opentelemetry.io/otel v1.7.0 go.opentelemetry.io/otel/metric v0.0.0-00010101000000-000000000000 + go.opentelemetry.io/otel/sdk v1.7.0 ) replace go.opentelemetry.io/otel => ../.. @@ -13,3 +14,5 @@ replace go.opentelemetry.io/otel => ../.. replace go.opentelemetry.io/otel/metric => ../../metric replace go.opentelemetry.io/otel/trace => ../../trace + +replace go.opentelemetry.io/otel/sdk => ../ diff --git a/sdk/metric/go.sum b/sdk/metric/go.sum index f6a0b224951..ac3360c6fee 100644 --- a/sdk/metric/go.sum +++ b/sdk/metric/go.sum @@ -12,6 +12,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7 h1:iGu644GcxtEcrInvDsQRCwJjtCIOlT2V7IRt6ah2Whw= +golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= diff --git a/sdk/metric/provider.go b/sdk/metric/provider.go index 7b0da769e0f..2b7be845933 100644 --- a/sdk/metric/provider.go +++ b/sdk/metric/provider.go @@ -21,6 +21,7 @@ import ( "context" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/sdk/resource" ) // MeterProvider handles the creation and coordination of Meters. All Meters @@ -28,7 +29,9 @@ import ( // the same Views applied to them, and have their produced metric telemetry // passed to the configured Readers. type MeterProvider struct { - // TODO (#2820): implement. + res *resource.Resource + + forceFlush, shutdown func(context.Context) error } // Compile-time check MeterProvider implements metric.MeterProvider. @@ -41,8 +44,15 @@ var _ metric.MeterProvider = (*MeterProvider)(nil) // created. This means the returned MeterProvider, one created with no // Readers, will be perform no operations. func NewMeterProvider(options ...Option) *MeterProvider { - // TODO (#2820): implement. - return &MeterProvider{} + conf := newConfig(options) + + flush, sdown := conf.readerSignals() + + return &MeterProvider{ + res: conf.res, + forceFlush: flush, + shutdown: sdown, + } } // Meter returns a Meter with the given name and configured with options. @@ -74,8 +84,9 @@ func (mp *MeterProvider) Meter(name string, options ...metric.MeterOption) metri // // This method is safe to call concurrently. func (mp *MeterProvider) ForceFlush(ctx context.Context) error { - // TODO (#2820): implement. - // TODO: test this is concurrent safe. + if mp.forceFlush != nil { + return mp.forceFlush(ctx) + } return nil } @@ -83,7 +94,8 @@ func (mp *MeterProvider) ForceFlush(ctx context.Context) error { // releasing any held computational resources. // // This call is idempotent. The first call will perform all flush and -// releasing operations. Subsequent calls will perform no action. +// releasing operations. Subsequent calls will perform no action and will +// return an error stating this. // // Measurements made by instruments from meters this MeterProvider created // will not be exported after Shutdown is called. @@ -95,7 +107,8 @@ func (mp *MeterProvider) ForceFlush(ctx context.Context) error { // // This method is safe to call concurrently. func (mp *MeterProvider) Shutdown(ctx context.Context) error { - // TODO (#2820): implement. - // TODO: test this is concurrent safe. + if mp.shutdown != nil { + return mp.shutdown(ctx) + } return nil } diff --git a/sdk/metric/provider_test.go b/sdk/metric/provider_test.go new file mode 100644 index 00000000000..dbcc43f4949 --- /dev/null +++ b/sdk/metric/provider_test.go @@ -0,0 +1,55 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.17 +// +build go1.17 + +package metric + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestForceFlushConcurrentSafe(t *testing.T) { + mp := NewMeterProvider() + + go func() { + _ = mp.ForceFlush(context.Background()) + }() + + _ = mp.ForceFlush(context.Background()) +} + +func TestShutdownConcurrentSafe(t *testing.T) { + mp := NewMeterProvider() + + go func() { + _ = mp.Shutdown(context.Background()) + }() + + _ = mp.Shutdown(context.Background()) +} + +func TestForceFlushDoesNotPanicForEmptyMeterProvider(t *testing.T) { + mp := MeterProvider{} + assert.NotPanics(t, func() { _ = mp.ForceFlush(context.Background()) }) +} + +func TestShutdownDoesNotPanicForEmptyMeterProvider(t *testing.T) { + mp := MeterProvider{} + assert.NotPanics(t, func() { _ = mp.Shutdown(context.Background()) }) +}