-
Notifications
You must be signed in to change notification settings - Fork 526
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: multi-watcher #19069
base: main
Are you sure you want to change the base?
fix: multi-watcher #19069
Conversation
6e0e5d0
to
aeb0bb0
Compare
domain/keyupdater/service/service.go
Outdated
modelKeysWatcher, err := s.watcherFactory.NewValueWatcher( | ||
"model_authorized_keys", | ||
modelId.String(), | ||
changestream.All, | ||
) | ||
if err != nil { | ||
return nil, errors.Errorf( | ||
"making watcher for machine %q authorized keys when watching model %q authorized key changes: %w", | ||
machineName, modelId, err, | ||
) | ||
} | ||
|
||
userAuthWatcher, err := s.watcherFactory.NewNamespaceNotifyWatcher( | ||
"user_authentication", | ||
changestream.All, | ||
return s.watcherFactory.NewMultiWatcher( | ||
eventsource.ValueFilter( | ||
"model_authorized_keys", | ||
changestream.All, | ||
func(s string) bool { return s == modelId.String() }, | ||
), | ||
eventsource.NamespaceFilter( | ||
"user_authentication", | ||
changestream.All, | ||
), | ||
) | ||
if err != nil { | ||
return nil, errors.Errorf( | ||
"making watcher for machine %q authorized keys when watching user authentication changes: %w", | ||
machineName, err, | ||
) | ||
} | ||
|
||
watcher, err := eventsource.NewMultiNotifyWatcher(ctx, modelKeysWatcher, userAuthWatcher) | ||
if err != nil { | ||
return nil, errors.Errorf( | ||
"making watcher for machine %q when combining user authentication and model authorized keys watcher: %w", | ||
machineName, err, | ||
) | ||
} | ||
|
||
return watcher, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This went from 3 watchers to just 1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Been looking forward to this.
I have some quibbles about naming - see what you think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few more nits.
I am concerned about the apparent lack of input validation.
This extends down to FilteredNamespace() which doesn't check that namespace is not "". It does however check that change mask is 0 or not and panics. Ewww. Let's not panic in production code. And because we don't validate input for say ValueFilter
it seems from a quick look you could pass in 0 for change type and then later on get a panic; so we're not even panicing at the root source of the problem.
) *ValueWatcher { | ||
opts := make([]changestream.SubscriptionOption, len(filterOptions)) | ||
for i, opt := range filterOptions { | ||
predicate := opt.ChangePredicate() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no guard that ChangePredicate can never be nil. ValueFilter
call be called with a nil param and there's no check done.
Not as serious but along the same lines, namespace is not checked to ensure it's not ""
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it's nil and you didn't add tests, I kinda think that's on you.
Provide better filter options so it's possible to mix both value and namespace watchers in a multi-value watcher.
ffb0ed7
to
015d297
Compare
Deprecates the multi-watcher that composes watchers together, instead exposing the existing changestream filtering to a new multi-watcher. The changestream already had the ability to filter over multiple subscriptions, it just wasn't exposed to the watchers.
The crux of the issue is that a multi-watcher that composes watchers can end up with different terms overlapping and this can break the reasoning of the system. The way that the change stream and event multiplexing works is that if
you make multiple changes in a commit, all of those changes will be dispatched to any watchers in the next term. This makes reasoning about watcher behaviour easy - we think in terms of discrete events and batches of data.
With a MultiWatcher, which composes individual "sub" watchers, we might have multiple notifications for a term depending on the speed of the machine, consumers etc.
Exposing filtering options now removes the need for the following watcher constructors (a follow PR will remove these along with updating the domains):
You just don't need them because the filter options allow you to express the same thing. For example, what was 30+ lines of hairy code, can just become:
Once a follow-up PR removes the deprecated watcher constructors we can rename
NewMultiValueWatcher
andNewMultiValueMapperWatcher
to justNewFilterWatcher
andNewFilterMapperWatcher
respectively.This pull request introduces several changes to the
core/watcher/eventsource
anddomain/keyupdater
packages. The primary focus is on deprecating old watcher functions and introducing new filter-based watcher functions. Additionally, there are updates to the test files to reflect these changes.Deprecations and New Functions:
core/watcher/eventsource/multiwatcher.go
: DeprecatedNewMultiNotifyWatcher
,NewMultiStringsWatcher
, andNewMultiWatcher
functions in favor ofNewMultiValueWatcher
. [1] [2] [3]core/watcher/eventsource/value.go
: Introduced new filter options and functions such asFilterOption
,ValueFilter
,NamespaceFilter
,NewFilterWatcher
, andNewFilterMapperWatcher
. Deprecated several old functions includingNewValueWatcher
,NewValueMapperWatcher
,NewNamespaceNotifyWatcher
, andNewNamespaceNotifyMapperWatcher
. [1] [2] [3]Test Updates:
core/watcher/eventsource/value_test.go
: Updated test functions to use the newsubscriptionOptionMatcher
struct format. [1] [2] [3] [4] [5] [6] [7] [8] [9] [10]domain/keyupdater/keyupdater_test.go
: Re-enabled a previously skipped test and updated test harness usage. [1] [2] [3]Service Changes:
domain/keyupdater/service/service.go
: AddedNewFilterWatcher
to theWatcherFactory
interface and updated theWatchAuthorisedKeysForMachine
method to use the new filter watcher. [1] [2]QA steps
See keyupdater domain test for an example of how this is fixed.