diff --git a/.chloggen/fix-ordering-watcher-confmap.yaml b/.chloggen/fix-ordering-watcher-confmap.yaml new file mode 100644 index 00000000000..b301d8aa7fb --- /dev/null +++ b/.chloggen/fix-ordering-watcher-confmap.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: pkg/confmap + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix a potential race condition in confmap by closing the providers first. + +# One or more tracking issues or pull requests related to the change +issues: [14018] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/confmap/provider.go b/confmap/provider.go index 68f74ab834e..ab3e9a18a5b 100644 --- a/confmap/provider.go +++ b/confmap/provider.go @@ -86,6 +86,8 @@ type Provider interface { // This method must be called when the Collector service ends, either in case of // success or error. Retrieve cannot be called after Shutdown. // + // Provider MUST shutdown and wait for any goroutine(s) that were created to call `watcher`, if any. + // // Should never be called concurrently with itself or with Retrieve. // If ctx is cancelled should return immediately with an error. Shutdown(ctx context.Context) error diff --git a/confmap/resolver.go b/confmap/resolver.go index 819ef9c161f..4fa84f01632 100644 --- a/confmap/resolver.go +++ b/confmap/resolver.go @@ -243,14 +243,14 @@ func (mr *Resolver) Watch() <-chan error { // // Should never be called concurrently with itself or Get. func (mr *Resolver) Shutdown(ctx context.Context) error { - close(mr.watcher) - var errs error errs = multierr.Append(errs, mr.closeIfNeeded(ctx)) for _, p := range mr.providers { errs = multierr.Append(errs, p.Shutdown(ctx)) } + close(mr.watcher) + return errs } diff --git a/confmap/resolver_test.go b/confmap/resolver_test.go index b560d07b7cf..51e9795f1b4 100644 --- a/confmap/resolver_test.go +++ b/confmap/resolver_test.go @@ -524,3 +524,47 @@ func newConfFromFile(tb testing.TB, fileName string) map[string]any { return NewFromStringMap(data).ToStringMap() } + +type provider struct { + wg sync.WaitGroup +} + +func newRaceDetectorProvider() ProviderFactory { + return NewProviderFactory(func(_ ProviderSettings) Provider { + return &provider{} + }) +} + +func (p *provider) Retrieve(_ context.Context, _ string, watcher WatcherFunc) (*Retrieved, error) { + p.wg.Add(1) + go func() { + // mock a config change event and wait for goroutine to return. + defer p.wg.Done() + watcher(&ChangeEvent{}) + }() + return NewRetrieved(map[string]any{}) +} + +func (p *provider) Scheme() string { + return "race" +} + +func (p *provider) Shutdown(context.Context) error { + p.wg.Wait() + return nil +} + +func TestProviderRaceCondition(t *testing.T) { + resolver, err := NewResolver(ResolverSettings{ + URIs: []string{"race:"}, + ProviderFactories: []ProviderFactory{ + newRaceDetectorProvider(), + }, + ConverterFactories: nil, + }) + require.NoError(t, err) + c, err := resolver.Resolve(context.Background()) + require.NoError(t, err) + require.NotNil(t, c) + require.NoError(t, resolver.Shutdown(context.Background())) +}