Skip to content

Commit

Permalink
Merge pull request #459 from freehan/neg-gc
Browse files Browse the repository at this point in the history
Harden NEG GC
  • Loading branch information
k8s-ci-robot authored Sep 13, 2018
2 parents ca3a07b + d3654a2 commit 6860926
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 20 deletions.
2 changes: 1 addition & 1 deletion cmd/glbc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func runControllers(ctx *context.ControllerContext) {

if ctx.NEGEnabled {
// TODO: Refactor NEG to use cloud mocks so ctx.Cloud can be referenced within NewController.
negController := neg.NewController(neg.NewAdapter(ctx.Cloud), ctx, lbc.Translator, ctx.ClusterNamer, flags.F.ResyncPeriod)
negController := neg.NewController(neg.NewAdapter(ctx.Cloud), ctx, lbc.Translator, ctx.ClusterNamer, flags.F.ResyncPeriod, flags.F.NegGCPeriod)
go negController.Run(stopCh)
glog.V(0).Infof("negController started")
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ var (
WatchNamespace string
NodePortRanges PortRanges
EnableBackendConfig bool
NegGCPeriod time.Duration

LeaderElection LeaderElectionConfiguration
}{}
Expand Down Expand Up @@ -215,6 +216,8 @@ L7 load balancing. CSV values accepted. Example: -node-port-ranges=80,8080,400-5
`This flag is deprecated. Use -v to control verbosity.`)
flag.Bool("use-real-cloud", false,
`This flag has been deprecated and no longer has any effect.`)
flag.DurationVar(&F.NegGCPeriod, "neg-gc-period", 120*time.Second,
`Relist and garbage collect NEGs this often.`)
}

type RateLimitSpecs struct {
Expand Down
12 changes: 10 additions & 2 deletions pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func init() {
type Controller struct {
manager negSyncerManager
resyncPeriod time.Duration
gcPeriod time.Duration
recorder record.EventRecorder
namer networkEndpointGroupNamer
zoneGetter zoneGetter
Expand All @@ -74,7 +75,8 @@ func NewController(
ctx *context.ControllerContext,
zoneGetter zoneGetter,
namer networkEndpointGroupNamer,
resyncPeriod time.Duration) *Controller {
resyncPeriod time.Duration,
gcPeriod time.Duration) *Controller {
// init event recorder
// TODO: move event recorder initializer to main. Reuse it among controllers.
eventBroadcaster := record.NewBroadcaster()
Expand All @@ -91,6 +93,7 @@ func NewController(
client: ctx.KubeClient,
manager: manager,
resyncPeriod: resyncPeriod,
gcPeriod: gcPeriod,
recorder: recorder,
zoneGetter: zoneGetter,
namer: namer,
Expand Down Expand Up @@ -167,7 +170,12 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
}()

go wait.Until(c.serviceWorker, time.Second, stopCh)
go wait.Until(c.gc, c.resyncPeriod, stopCh)
go func() {
// Wait for gcPeriod to run the first GC
// This is to make sure that all services are fully processed before running GC.
time.Sleep(c.gcPeriod)
wait.Until(c.gc, c.gcPeriod, stopCh)
}()

<-stopCh
}
Expand Down
1 change: 1 addition & 0 deletions pkg/neg/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func newTestController(kubeClient kubernetes.Interface) *Controller {
NewFakeZoneGetter(),
namer,
1*time.Second,
1*time.Second,
)
return controller
}
Expand Down
24 changes: 7 additions & 17 deletions pkg/neg/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,8 @@ func (manager *syncerManager) ShutDown() {
func (manager *syncerManager) GC() error {
glog.V(2).Infof("Start NEG garbage collection.")
defer glog.V(2).Infof("NEG garbage collection finished.")
for _, key := range manager.getAllStoppedSyncerKeys() {
manager.garbageCollectSyncer(key)
}
// Garbage collect Syncers
manager.garbageCollectSyncer()

// Garbage collect NEGs
if err := manager.garbageCollectNEG(); err != nil {
Expand All @@ -178,24 +177,15 @@ func (manager *syncerManager) GC() error {
return nil
}

func (manager *syncerManager) garbageCollectSyncer(key servicePort) {
// garbageCollectSyncer removes stopped syncer from syncerMap
func (manager *syncerManager) garbageCollectSyncer() {
manager.mu.Lock()
defer manager.mu.Unlock()
if manager.syncerMap[key].IsStopped() && !manager.syncerMap[key].IsShuttingDown() {
delete(manager.syncerMap, key)
}
}

func (manager *syncerManager) getAllStoppedSyncerKeys() []servicePort {
manager.mu.Lock()
defer manager.mu.Unlock()
ret := []servicePort{}
for key, syncer := range manager.syncerMap {
if syncer.IsStopped() {
ret = append(ret, key)
if syncer.IsStopped() && !syncer.IsShuttingDown() {
delete(manager.syncerMap, key)
}
}
return ret
}

func (manager *syncerManager) garbageCollectNEG() error {
Expand All @@ -219,7 +209,7 @@ func (manager *syncerManager) garbageCollectNEG() error {
manager.mu.Lock()
defer manager.mu.Unlock()
for key, ports := range manager.svcPortMap {
for sp, _ := range ports {
for sp := range ports {
name := manager.namer.NEG(key.namespace, key.name, sp)
negNames.Delete(name)
}
Expand Down

0 comments on commit 6860926

Please sign in to comment.