Skip to content

Commit aa8ecb3

Browse files
authored
Update GoVPP & aggregated watcher (#1410)
* Update GoVPP Signed-off-by: Ondrej Fabry <[email protected]> * Add aggregated watcher from vpp1908 branch Signed-off-by: Ondrej Fabry <[email protected]>
1 parent b09c3d1 commit aa8ecb3

File tree

12 files changed

+601
-230
lines changed

12 files changed

+601
-230
lines changed

Gopkg.lock

+2-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
+329
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,329 @@
1+
// Copyright (c) 2019 Cisco and/or its affiliates.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at:
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package watcher
16+
17+
import (
18+
"context"
19+
"fmt"
20+
"strings"
21+
22+
"github.com/ligato/cn-infra/datasync"
23+
"github.com/ligato/cn-infra/datasync/kvdbsync/local"
24+
"github.com/ligato/cn-infra/datasync/resync"
25+
"github.com/ligato/cn-infra/datasync/syncbase"
26+
"github.com/ligato/cn-infra/infra"
27+
"github.com/ligato/cn-infra/logging"
28+
"github.com/ligato/cn-infra/utils/safeclose"
29+
)
30+
31+
// Option is a function that acts on a Plugin to inject Dependencies or configuration
32+
type Option func(*Aggregator)
33+
34+
// UseWatchers returns option that sets watchers.
35+
func UseWatchers(watchers ...datasync.KeyValProtoWatcher) Option {
36+
return func(p *Aggregator) {
37+
p.Watchers = watchers
38+
}
39+
}
40+
41+
// Aggregator is an adapter that allows multiple
42+
// watchers (KeyValProtoWatcher) to be aggregated in one.
43+
// Watch request is delegated to all of them.
44+
type Aggregator struct {
45+
infra.PluginDeps
46+
47+
keyPrefixes []string
48+
localKVs map[string]datasync.KeyVal
49+
50+
Resync *resync.Plugin
51+
Local *syncbase.Registry
52+
Watchers []datasync.KeyValProtoWatcher
53+
}
54+
55+
// NewPlugin creates a new Plugin with the provides Options
56+
func NewPlugin(opts ...Option) *Aggregator {
57+
p := &Aggregator{}
58+
59+
p.PluginName = "aggregator"
60+
p.Local = local.DefaultRegistry
61+
p.Resync = &resync.DefaultPlugin
62+
63+
for _, o := range opts {
64+
o(p)
65+
}
66+
p.PluginDeps.SetupLog()
67+
68+
return p
69+
}
70+
71+
func (p *Aggregator) Init() error {
72+
p.localKVs = map[string]datasync.KeyVal{}
73+
return nil
74+
}
75+
76+
// Watch subscribes to every transport available within transport aggregator
77+
// and also subscribes to localclient (local.Registry).
78+
// The function implements KeyValProtoWatcher.Watch().
79+
func (p *Aggregator) Watch(
80+
resyncName string,
81+
changeChan chan datasync.ChangeEvent,
82+
resyncChan chan datasync.ResyncEvent,
83+
keyPrefixes ...string,
84+
) (datasync.WatchRegistration, error) {
85+
86+
p.keyPrefixes = keyPrefixes
87+
88+
// prepare list of watchers
89+
var watchers []datasync.KeyValProtoWatcher
90+
for _, w := range p.Watchers {
91+
if l, ok := w.(*syncbase.Registry); ok && p.Local != nil && l == p.Local {
92+
p.Log.Warn("found local registry (localclient) in watchers, ignoring it..")
93+
continue
94+
}
95+
watchers = append(watchers, w)
96+
}
97+
p.Watchers = watchers
98+
99+
// start watch for all watchers
100+
p.Log.Infof("Watch for %v with %d prefixes", resyncName, len(keyPrefixes))
101+
102+
aggrResync := make(chan datasync.ResyncEvent, len(watchers))
103+
104+
go p.watchAggrResync(aggrResync, resyncChan)
105+
106+
var registrations []datasync.WatchRegistration
107+
for i, adapter := range watchers {
108+
partChange := make(chan datasync.ChangeEvent)
109+
partResync := make(chan datasync.ResyncEvent)
110+
111+
name := fmt.Sprint(adapter) + "/" + resyncName
112+
watcherReg, err := adapter.Watch(name, changeChan, partResync, keyPrefixes...)
113+
if err != nil {
114+
return nil, err
115+
}
116+
117+
go func(i int, chanChange chan datasync.ChangeEvent, chanResync chan datasync.ResyncEvent) {
118+
for {
119+
select {
120+
case e := <-chanChange:
121+
p.Log.Debugf("watcher %d got CHANGE PART, sending to aggregated", i)
122+
changeChan <- e
123+
124+
case e := <-chanResync:
125+
p.Log.Debugf("watcher %d got RESYNC PART, sending to aggregated", i)
126+
aggrResync <- e
127+
}
128+
}
129+
}(i+1, partChange, partResync)
130+
131+
if watcherReg != nil {
132+
registrations = append(registrations, watcherReg)
133+
}
134+
}
135+
136+
// register and watch for localclient
137+
partResync := make(chan datasync.ResyncEvent)
138+
partChange := make(chan datasync.ChangeEvent)
139+
140+
go p.watchLocalEvents(partChange, changeChan, partResync)
141+
142+
name := "LOCAL" + "/" + resyncName
143+
localReg, err := p.Local.Watch(name, partChange, partResync, keyPrefixes...)
144+
if err != nil {
145+
return nil, err
146+
}
147+
148+
p.Log.Debug("added localclient as aggregated watcher")
149+
150+
registrations = append(registrations, localReg)
151+
152+
return &WatchRegistration{
153+
Registrations: registrations,
154+
}, nil
155+
}
156+
157+
func (p *Aggregator) watchAggrResync(aggrResync, resyncCh chan datasync.ResyncEvent) {
158+
aggregatedResync := func(allResyncs []datasync.ResyncEvent) {
159+
var prefixKeyVals = map[string]map[string]datasync.KeyVal{}
160+
161+
kvToKeyVals := func(prefix string, kv datasync.KeyVal) {
162+
keyVals, ok := prefixKeyVals[prefix]
163+
if !ok {
164+
p.Log.Debugf(" - keyval prefix: %v", prefix)
165+
keyVals = map[string]datasync.KeyVal{}
166+
prefixKeyVals[prefix] = keyVals
167+
}
168+
key := kv.GetKey()
169+
if _, ok := keyVals[key]; ok {
170+
p.Log.Warnf("resync from watcher overwrites key: %v", key)
171+
}
172+
keyVals[key] = kv
173+
}
174+
175+
// process resync events from all watchers
176+
p.Log.Debugf("preparing keyvals for aggregated resync from %d cached resyncs", len(allResyncs))
177+
for _, ev := range allResyncs {
178+
for prefix, iterator := range ev.GetValues() {
179+
for {
180+
kv, allReceived := iterator.GetNext()
181+
if allReceived {
182+
break
183+
}
184+
185+
kvToKeyVals(prefix, kv)
186+
}
187+
}
188+
}
189+
190+
// process keyvals from localclient
191+
p.Log.Debugf("preparing localclient keyvals for aggregated resync with %d keyvals", len(allResyncs))
192+
for key, kv := range p.localKVs {
193+
var kvprefix string
194+
for _, prefix := range p.keyPrefixes {
195+
if strings.HasPrefix(key, prefix) {
196+
kvprefix = prefix
197+
break
198+
}
199+
}
200+
if kvprefix == "" {
201+
p.Log.Warnf("not found registered prefix for keyval from localclient with key: %v", key)
202+
}
203+
kvToKeyVals(kvprefix, kv)
204+
}
205+
206+
// prepare aggregated resync
207+
var vals = map[string]datasync.KeyValIterator{}
208+
for prefix, keyVals := range prefixKeyVals {
209+
var data []datasync.KeyVal
210+
for _, kv := range keyVals {
211+
data = append(data, kv)
212+
}
213+
vals[prefix] = syncbase.NewKVIterator(data)
214+
}
215+
resEv := syncbase.NewResyncEventDB(context.Background(), vals)
216+
217+
p.Log.Debugf("sending aggregated resync event (%d prefixes) to original resync channel", len(vals))
218+
resyncCh <- resEv
219+
p.Log.Debugf("aggregated resync was accepted, waiting for done chan")
220+
resErr := <-resEv.DoneChan
221+
p.Log.Debugf("aggregated resync done (err=%v) watchers", resErr)
222+
223+
}
224+
225+
var cachedResyncs []datasync.ResyncEvent
226+
227+
// process resync events from watchers
228+
for {
229+
select {
230+
case e, ok := <-aggrResync:
231+
if !ok {
232+
p.Log.Debugf("aggrResync channel was closed")
233+
return
234+
}
235+
236+
cachedResyncs = append(cachedResyncs, e)
237+
p.Log.Debugf("watchers received resync event (%d/%d watchers done)", len(cachedResyncs), len(p.Watchers))
238+
239+
e.Done(nil)
240+
}
241+
242+
if len(cachedResyncs) == len(p.Watchers) {
243+
p.Log.Debug("resyncs from all watchers received, calling aggregated resync")
244+
aggregatedResync(cachedResyncs)
245+
// clear resyncs
246+
cachedResyncs = nil
247+
}
248+
}
249+
}
250+
251+
func (p *Aggregator) watchLocalEvents(partChange, changeChan chan datasync.ChangeEvent, partResync chan datasync.ResyncEvent) {
252+
for {
253+
select {
254+
case e := <-partChange:
255+
p.Log.Debugf("LOCAL got CHANGE part, %d changes, sending to aggregated", len(e.GetChanges()))
256+
257+
for _, change := range e.GetChanges() {
258+
key := change.GetKey()
259+
switch change.GetChangeType() {
260+
case datasync.Delete:
261+
p.Log.Debugf(" - DEL %s", key)
262+
delete(p.localKVs, key)
263+
case datasync.Put:
264+
p.Log.Debugf(" - PUT %s", key)
265+
p.localKVs[key] = change
266+
}
267+
}
268+
changeChan <- e
269+
270+
case e := <-partResync:
271+
p.Log.Debugf("LOCAL watcher got RESYNC part, sending to aggregated")
272+
273+
p.localKVs = map[string]datasync.KeyVal{}
274+
for _, iterator := range e.GetValues() {
275+
for {
276+
kv, allReceived := iterator.GetNext()
277+
if allReceived {
278+
break
279+
}
280+
281+
key := kv.GetKey()
282+
p.localKVs[key] = kv
283+
}
284+
}
285+
p.Log.Debugf("LOCAL watcher resynced %d keyvals", len(p.localKVs))
286+
e.Done(nil)
287+
288+
p.Log.Debug("LOCAL watcher calling RESYNC")
289+
p.Resync.DoResync()
290+
}
291+
}
292+
}
293+
294+
// WatchRegistration is adapter that allows multiple
295+
// registrations (WatchRegistration) to be aggregated in one.
296+
// Close operation is applied collectively to all included registration.
297+
type WatchRegistration struct {
298+
Registrations []datasync.WatchRegistration
299+
}
300+
301+
// Register new key for all available aggregator objects. Call Register(keyPrefix) on specific registration
302+
// to add the key from that registration only
303+
func (wa *WatchRegistration) Register(resyncName, keyPrefix string) error {
304+
for _, registration := range wa.Registrations {
305+
if err := registration.Register(resyncName, keyPrefix); err != nil {
306+
logging.DefaultLogger.Warnf("aggregated register failed: %v", err)
307+
}
308+
}
309+
310+
return nil
311+
}
312+
313+
// Unregister closed registration of specific key under all available aggregator objects.
314+
// Call Unregister(keyPrefix) on specific registration to remove the key from that registration only
315+
func (wa *WatchRegistration) Unregister(keyPrefix string) error {
316+
for _, registration := range wa.Registrations {
317+
if err := registration.Unregister(keyPrefix); err != nil {
318+
logging.DefaultLogger.Warnf("aggregated unregister failed: %v", err)
319+
}
320+
}
321+
322+
return nil
323+
}
324+
325+
// Close every registration under the aggregator.
326+
// This function implements WatchRegistration.Close().
327+
func (wa *WatchRegistration) Close() error {
328+
return safeclose.Close(wa.Registrations)
329+
}

0 commit comments

Comments
 (0)