Skip to content

Commit 6766786

Browse files
authored
Remove forced delay for linux interface notifications (#1742)
1 parent 65d5192 commit 6766786

File tree

3 files changed

+22
-69
lines changed

3 files changed

+22
-69
lines changed

plugins/configurator/configurator.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -126,15 +126,15 @@ func (svc *configuratorServer) Update(ctx context.Context, req *pb.UpdateRequest
126126
}
127127

128128
if req.WaitDone {
129+
waitStart := time.Now()
129130
var pendingKeys []string
130131
for _, res := range results {
131132
if res.Status.GetState() == kvscheduler.ValueState_PENDING {
132133
pendingKeys = append(pendingKeys, res.Key)
133134
}
134135
}
135136
if len(pendingKeys) > 0 {
136-
waitStart := time.Now()
137-
svc.log.Infof("waiting for %d pending keys to be done", len(pendingKeys))
137+
svc.log.Infof("waiting for %d pending keys", len(pendingKeys))
138138
for len(pendingKeys) > 0 {
139139
select {
140140
case <-time.After(waitDoneCheckPendingPeriod):
@@ -144,10 +144,10 @@ func (svc *configuratorServer) Update(ctx context.Context, req *pb.UpdateRequest
144144
return nil, ctx.Err()
145145
}
146146
}
147-
svc.log.Infof("finished waiting for pending keys to be done (took %v)", time.Since(waitStart))
148147
} else {
149148
svc.log.Debugf("no pendings keys to wait for")
150149
}
150+
svc.log.Infof("finished waiting for done (took %v)", time.Since(waitStart))
151151
}
152152

153153
svc.log.Debugf("config update finished with %d results", len(results))

plugins/linux/ifplugin/descriptor/watcher.go

+18-66
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"context"
1919
"strings"
2020
"sync"
21-
"time"
2221

2322
"github.com/golang/protobuf/proto"
2423
prototypes "github.com/golang/protobuf/ptypes/empty"
@@ -35,11 +34,6 @@ const (
3534
// InterfaceWatcherName is the name of the descriptor watching Linux interfaces
3635
// in the default namespace.
3736
InterfaceWatcherName = "linux-interface-watcher"
38-
39-
// notificationDelay specifies how long to delay notification when interface changes.
40-
// Typically interface is created in multiple stages and we do not want to notify
41-
// scheduler about intermediate states.
42-
notificationDelay = 500 * time.Millisecond
4337
)
4438

4539
// InterfaceWatcher watches default namespace for newly added/removed Linux interfaces.
@@ -58,9 +52,6 @@ type InterfaceWatcher struct {
5852
ifacesMu sync.Mutex
5953
ifaces map[string]struct{}
6054

61-
// interface changes delayed to give Linux time to "finalize" them
62-
pendingIntfs map[string]bool // interface name -> exists?
63-
6455
// conditional variable to check if the list of interfaces is in-sync with
6556
// Linux network stack
6657
intfsInSync bool
@@ -74,13 +65,12 @@ type InterfaceWatcher struct {
7465
// NewInterfaceWatcher creates a new instance of the Interface Watcher.
7566
func NewInterfaceWatcher(kvscheduler kvs.KVScheduler, ifHandler linuxcalls.NetlinkAPI, log logging.PluginLogger) *InterfaceWatcher {
7667
descriptor := &InterfaceWatcher{
77-
log: log.NewLogger("if-watcher"),
78-
kvscheduler: kvscheduler,
79-
ifHandler: ifHandler,
80-
ifaces: make(map[string]struct{}),
81-
pendingIntfs: make(map[string]bool),
82-
notifCh: make(chan netlink.LinkUpdate),
83-
doneCh: make(chan struct{}),
68+
log: log.NewLogger("if-watcher"),
69+
kvscheduler: kvscheduler,
70+
ifHandler: ifHandler,
71+
ifaces: make(map[string]struct{}),
72+
notifCh: make(chan netlink.LinkUpdate),
73+
doneCh: make(chan struct{}),
8474
}
8575
descriptor.intfsInSyncCond = sync.NewCond(&descriptor.ifacesMu)
8676
descriptor.ctx, descriptor.cancel = context.WithCancel(context.Background())
@@ -185,59 +175,14 @@ func (w *InterfaceWatcher) processLinkNotification(linkUpdate netlink.LinkUpdate
185175
defer w.ifacesMu.Unlock()
186176

187177
ifName := linkUpdate.Attrs().Name
188-
isEnabled := linkUpdate.Attrs().OperState != netlink.OperDown &&
189-
linkUpdate.Attrs().OperState != netlink.OperNotPresent
190-
191-
_, isPendingNotif := w.pendingIntfs[ifName]
192-
if isPendingNotif {
193-
// notification for this interface is already scheduled, just update the state
194-
w.pendingIntfs[ifName] = isEnabled
195-
return
196-
}
178+
isUp := isLinkUp(linkUpdate)
197179

198-
if !w.needsUpdate(ifName, isEnabled) {
180+
if !w.needsUpdate(ifName, isUp) {
199181
// ignore notification if the interface admin status remained the same
200182
return
201183
}
202184

203-
if isEnabled {
204-
// do not notify until interface is truly finished
205-
w.pendingIntfs[ifName] = true
206-
w.wg.Add(1)
207-
go w.delayNotification(ifName)
208-
return
209-
}
210-
211-
// notification about removed interface is propagated immediately
212-
w.notifyScheduler(ifName, false)
213-
}
214-
215-
// delayNotification delays notification about enabled interface - typically
216-
// interface is created in multiple stages and we do not want to notify scheduler
217-
// about intermediate states.
218-
func (w *InterfaceWatcher) delayNotification(ifName string) {
219-
defer w.wg.Done()
220-
221-
select {
222-
case <-w.ctx.Done():
223-
return
224-
case <-time.After(notificationDelay):
225-
w.applyDelayedNotification(ifName)
226-
}
227-
}
228-
229-
// applyDelayedNotification applies delayed interface notification.
230-
func (w *InterfaceWatcher) applyDelayedNotification(ifName string) {
231-
w.ifacesMu.Lock()
232-
defer w.ifacesMu.Unlock()
233-
234-
// in the meantime the status may have changed and may not require update anymore
235-
isEnabled := w.pendingIntfs[ifName]
236-
if w.needsUpdate(ifName, isEnabled) {
237-
w.notifyScheduler(ifName, isEnabled)
238-
}
239-
240-
delete(w.pendingIntfs, ifName)
185+
w.notifyScheduler(ifName, isUp)
241186
}
242187

243188
// notifyScheduler notifies scheduler about interface change.
@@ -251,14 +196,21 @@ func (w *InterfaceWatcher) notifyScheduler(ifName string, enabled bool) {
251196
delete(w.ifaces, ifName)
252197
}
253198

254-
w.kvscheduler.PushSBNotification(kvs.KVWithMetadata{
199+
if err := w.kvscheduler.PushSBNotification(kvs.KVWithMetadata{
255200
Key: ifmodel.InterfaceHostNameKey(ifName),
256201
Value: value,
257202
Metadata: nil,
258-
})
203+
}); err != nil {
204+
w.log.Warnf("pushing SB notification failed: %v", err)
205+
}
259206
}
260207

261208
func (w *InterfaceWatcher) needsUpdate(ifName string, isEnabled bool) bool {
262209
_, wasEnabled := w.ifaces[ifName]
263210
return isEnabled != wasEnabled
264211
}
212+
213+
func isLinkUp(update netlink.LinkUpdate) bool {
214+
return update.Attrs().OperState != netlink.OperDown &&
215+
update.Attrs().OperState != netlink.OperNotPresent
216+
}

plugins/linux/nsplugin/ns_plugin.go

+1
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,7 @@ func (p *NsPlugin) getOrCreateNs(ctx nsLinuxcalls.NamespaceMgmtCtx, ns *nsmodel.
248248
case nsmodel.NetNamespace_NSID:
249249
nsHandle, err = p.sysHandler.GetNamespaceFromName(ns.Reference)
250250
if err != nil {
251+
p.Log.Warnf("GetNamespaceFromName %s failed: %v", ns.Reference, err)
251252
// Create named namespace if it doesn't exist yet.
252253
_, err = p.namedNsHandler.CreateNamedNetNs(ctx, ns.Reference)
253254
if err != nil {

0 commit comments

Comments
 (0)