Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backport of "Fix memory leak in service mirror" to 2.12 branch #11345

Merged
merged 9 commits into from
Sep 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ jobs:
runs-on: ubuntu-20.04
timeout-minutes: 15
steps:
- uses: actions/setup-go@268d8c0ca0432bb2cf416faae41297df9d262d7f
- uses: actions/setup-go@6edd4406fa81c3da01a34fa6f6343087c207a568
with:
go-version: '1.17'
go-version: '1.19'
- uses: actions/checkout@93ea575cb5d8a053eaa0ac8fa3b40d7e05a33cc8
- uses: actions/download-artifact@fb598a63ae348fa914e94cd0ff38f362e927b741
with:
Expand Down Expand Up @@ -262,9 +262,9 @@ jobs:
runs-on: ubuntu-20.04
timeout-minutes: 15
steps:
- uses: actions/setup-go@268d8c0ca0432bb2cf416faae41297df9d262d7f
- uses: actions/setup-go@6edd4406fa81c3da01a34fa6f6343087c207a568
with:
go-version: '1.17'
go-version: '1.19'
- uses: actions/checkout@93ea575cb5d8a053eaa0ac8fa3b40d7e05a33cc8
- uses: actions/download-artifact@fb598a63ae348fa914e94cd0ff38f362e927b741
with:
Expand Down Expand Up @@ -303,9 +303,9 @@ jobs:
runs-on: ubuntu-20.04
timeout-minutes: 15
steps:
- uses: actions/setup-go@268d8c0ca0432bb2cf416faae41297df9d262d7f
- uses: actions/setup-go@6edd4406fa81c3da01a34fa6f6343087c207a568
with:
go-version: '1.17'
go-version: '1.19'
- uses: actions/checkout@93ea575cb5d8a053eaa0ac8fa3b40d7e05a33cc8
- uses: actions/download-artifact@fb598a63ae348fa914e94cd0ff38f362e927b741
with:
Expand Down
12 changes: 6 additions & 6 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@ jobs:
steps:
- name: Checkout code
uses: actions/checkout@93ea575cb5d8a053eaa0ac8fa3b40d7e05a33cc8
- uses: actions/setup-go@268d8c0ca0432bb2cf416faae41297df9d262d7f
- uses: actions/setup-go@6edd4406fa81c3da01a34fa6f6343087c207a568
with:
go-version: '1.18'
go-version: '1.19'
- name: Download image archives
uses: actions/download-artifact@fb598a63ae348fa914e94cd0ff38f362e927b741
with:
Expand Down Expand Up @@ -178,9 +178,9 @@ jobs:
steps:
- name: Checkout code
uses: actions/checkout@93ea575cb5d8a053eaa0ac8fa3b40d7e05a33cc8
- uses: actions/setup-go@268d8c0ca0432bb2cf416faae41297df9d262d7f
- uses: actions/setup-go@6edd4406fa81c3da01a34fa6f6343087c207a568
with:
go-version: '1.18'
go-version: '1.19'
- name: Set environment variables from scripts
run: |
TAG='${{ needs.tag.outputs.tag }}'
Expand All @@ -202,9 +202,9 @@ jobs:
timeout-minutes: 30
steps:
- uses: actions/checkout@93ea575cb5d8a053eaa0ac8fa3b40d7e05a33cc8
- uses: actions/setup-go@c4a742cab115ed795e34d4513e2cf7d472deb55f
- uses: actions/setup-go@6edd4406fa81c3da01a34fa6f6343087c207a568
with:
go-version: '1.18'
go-version: '1.19'
- uses: docker/setup-buildx-action@94ab11c41e45d028884a99163086648e898eed25
- name: Pull linkerd binary
run: |
Expand Down
1 change: 0 additions & 1 deletion bin/install-deps
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ CGO_ENABLED=0 GOOS=linux GOARCH=$arch go install -mod=readonly \
k8s.io/client-go/plugin/pkg/client/auth/azure \
k8s.io/client-go/plugin/pkg/client/auth/gcp \
k8s.io/client-go/plugin/pkg/client/auth/oidc \
k8s.io/client-go/plugin/pkg/client/auth/openstack \
k8s.io/client-go/rest \
k8s.io/client-go/rest/watch \
k8s.io/client-go/tools/auth \
Expand Down
20 changes: 16 additions & 4 deletions controller/api/destination/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,22 @@ func NewServer(
return nil, err
}

endpoints := watcher.NewEndpointsWatcher(k8sAPI, log, enableEndpointSlices)
opaquePorts := watcher.NewOpaquePortsWatcher(k8sAPI, log, defaultOpaquePorts)
profiles := watcher.NewProfileWatcher(k8sAPI, log)
servers := watcher.NewServerWatcher(k8sAPI, log)
endpoints, err := watcher.NewEndpointsWatcher(k8sAPI, log, enableEndpointSlices)
if err != nil {
return nil, err
}
opaquePorts, err := watcher.NewOpaquePortsWatcher(k8sAPI, log, defaultOpaquePorts)
if err != nil {
return nil, err
}
profiles, err := watcher.NewProfileWatcher(k8sAPI, log)
if err != nil {
return nil, err
}
servers, err := watcher.NewServerWatcher(k8sAPI, log)
if err != nil {
return nil, err
}

srv := server{
pb.UnimplementedDestinationServer{},
Expand Down
20 changes: 16 additions & 4 deletions controller/api/destination/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,10 +381,22 @@ spec:
t.Fatalf("initializeIndexers returned an error: %s", err)
}

endpoints := watcher.NewEndpointsWatcher(k8sAPI, log, false)
opaquePorts := watcher.NewOpaquePortsWatcher(k8sAPI, log, defaultOpaquePorts)
profiles := watcher.NewProfileWatcher(k8sAPI, log)
servers := watcher.NewServerWatcher(k8sAPI, log)
endpoints, err := watcher.NewEndpointsWatcher(k8sAPI, log, false)
if err != nil {
t.Fatalf("can't create Endpoints watcher: %s", err)
}
opaquePorts, err := watcher.NewOpaquePortsWatcher(k8sAPI, log, defaultOpaquePorts)
if err != nil {
t.Fatalf("can't create opaque ports watcher: %s", err)
}
profiles, err := watcher.NewProfileWatcher(k8sAPI, log)
if err != nil {
t.Fatalf("can't create profile watcher: %s", err)
}
servers, err := watcher.NewServerWatcher(k8sAPI, log)
if err != nil {
t.Fatalf("can't create Server watcher: %s", err)
}

// Sync after creating watchers so that the the indexers added get updated
// properly
Expand Down
24 changes: 18 additions & 6 deletions controller/api/destination/watcher/endpoints_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ var undefinedEndpointPort = Port(0)
// NewEndpointsWatcher creates an EndpointsWatcher and begins watching the
// k8sAPI for pod, service, and endpoint changes. An EndpointsWatcher will
// watch on Endpoints or EndpointSlice resources, depending on cluster configuration.
func NewEndpointsWatcher(k8sAPI *k8s.API, log *logging.Entry, enableEndpointSlices bool) *EndpointsWatcher {
func NewEndpointsWatcher(k8sAPI *k8s.API, log *logging.Entry, enableEndpointSlices bool) (*EndpointsWatcher, error) {
ew := &EndpointsWatcher{
publishers: make(map[ServiceID]*servicePublisher),
k8sAPI: k8sAPI,
Expand All @@ -144,34 +144,46 @@ func NewEndpointsWatcher(k8sAPI *k8s.API, log *logging.Entry, enableEndpointSlic
}),
}

k8sAPI.Svc().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
_, err := k8sAPI.Svc().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ew.addService,
DeleteFunc: ew.deleteService,
UpdateFunc: func(_, obj interface{}) { ew.addService(obj) },
})
if err != nil {
return nil, err
}

k8sAPI.Srv().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
_, err = k8sAPI.Srv().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ew.addServer,
DeleteFunc: ew.deleteServer,
UpdateFunc: func(_, obj interface{}) { ew.addServer(obj) },
})
if err != nil {
return nil, err
}

if ew.enableEndpointSlices {
ew.log.Debugf("Watching EndpointSlice resources")
k8sAPI.ES().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
_, err := k8sAPI.ES().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ew.addEndpointSlice,
DeleteFunc: ew.deleteEndpointSlice,
UpdateFunc: ew.updateEndpointSlice,
})
if err != nil {
return nil, err
}
} else {
ew.log.Debugf("Watching Endpoints resources")
k8sAPI.Endpoint().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
_, err = k8sAPI.Endpoint().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ew.addEndpoints,
DeleteFunc: ew.deleteEndpoints,
UpdateFunc: func(_, obj interface{}) { ew.addEndpoints(obj) },
})
if err != nil {
return nil, err
}
}
return ew
return ew, nil
}

////////////////////////
Expand Down
40 changes: 32 additions & 8 deletions controller/api/destination/watcher/endpoints_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,10 @@ status:
t.Fatalf("NewFakeAPI returned an error: %s", err)
}

watcher := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), false)
watcher, err := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), false)
if err != nil {
t.Fatalf("can't create Endpoints watcher: %s", err)
}

k8sAPI.Sync(nil)

Expand Down Expand Up @@ -1281,7 +1284,10 @@ status:
t.Fatalf("NewFakeAPI returned an error: %s", err)
}

watcher := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), true)
watcher, err := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), true)
if err != nil {
t.Fatalf("can't create Endpoints watcher: %s", err)
}

k8sAPI.Sync(nil)

Expand Down Expand Up @@ -1398,7 +1404,10 @@ status:
t.Fatalf("NewFakeAPI returned an error: %s", err)
}

watcher := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), false)
watcher, err := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), false)
if err != nil {
t.Fatalf("can't create Endpoints watcher: %s", err)
}

k8sAPI.Sync(nil)

Expand Down Expand Up @@ -1519,7 +1528,10 @@ status:
t.Fatalf("NewFakeAPI returned an error: %s", err)
}

watcher := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), true)
watcher, err := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), true)
if err != nil {
t.Fatalf("can't create Endpoints watcher: %s", err)
}

k8sAPI.Sync(nil)

Expand Down Expand Up @@ -1746,7 +1758,10 @@ subsets:
t.Fatalf("NewFakeAPI returned an error: %s", err)
}

watcher := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), tt.enableEndpointSlices)
watcher, err := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), tt.enableEndpointSlices)
if err != nil {
t.Fatalf("can't create Endpoints watcher: %s", err)
}

k8sAPI.Sync(nil)

Expand Down Expand Up @@ -1906,7 +1921,10 @@ subsets:
t.Fatalf("NewFakeAPI returned an error: %s", err)
}

watcher := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), false)
watcher, err := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), false)
if err != nil {
t.Fatalf("can't create Endpoints watcher: %s", err)
}

k8sAPI.Sync(nil)

Expand Down Expand Up @@ -2029,7 +2047,10 @@ status:
t.Fatalf("NewFakeAPI returned an error: %s", err)
}

watcher := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), false)
watcher, err := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), false)
if err != nil {
t.Fatalf("can't create Endpoints watcher: %s", err)
}

k8sAPI.Sync(nil)

Expand Down Expand Up @@ -2122,7 +2143,10 @@ status:
t.Fatalf("NewFakeAPI returned an error: %s", err)
}

watcher := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), true)
watcher, err := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), true)
if err != nil {
t.Fatalf("can't create Endpoints watcher: %s", err)
}

k8sAPI.Sync(nil)

Expand Down
10 changes: 7 additions & 3 deletions controller/api/destination/watcher/opaque_ports_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,23 @@ type (

// NewOpaquePortsWatcher creates a OpaquePortsWatcher and begins watching for
// k8sAPI for service changes.
func NewOpaquePortsWatcher(k8sAPI *k8s.API, log *logging.Entry, opaquePorts map[uint32]struct{}) *OpaquePortsWatcher {
func NewOpaquePortsWatcher(k8sAPI *k8s.API, log *logging.Entry, opaquePorts map[uint32]struct{}) (*OpaquePortsWatcher, error) {
opw := &OpaquePortsWatcher{
subscriptions: make(map[ServiceID]*svcSubscriptions),
k8sAPI: k8sAPI,
log: log.WithField("component", "opaque-ports-watcher"),
defaultOpaquePorts: opaquePorts,
}
k8sAPI.Svc().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
_, err := k8sAPI.Svc().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: opw.addService,
DeleteFunc: opw.deleteService,
UpdateFunc: func(_, obj interface{}) { opw.addService(obj) },
})
return opw
if err != nil {
return nil, err
}

return opw, nil
}

// Subscribe subscribes a listener to a service; each time the service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,10 @@ func TestOpaquePortsWatcher(t *testing.T) {
if err != nil {
t.Fatalf("NewFakeAPI returned an error: %s", err)
}
watcher := NewOpaquePortsWatcher(k8sAPI, logging.WithField("test", t.Name()), defaultOpaquePorts)
watcher, err := NewOpaquePortsWatcher(k8sAPI, logging.WithField("test", t.Name()), defaultOpaquePorts)
if err != nil {
t.Fatalf("can't create opaque ports watcher: %s", err)
}
k8sAPI.Sync(nil)
listener := newTestOpaquePortsListener()
watcher.Subscribe(tt.service, listener)
Expand Down
9 changes: 6 additions & 3 deletions controller/api/destination/watcher/profile_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,22 +44,25 @@ var profileVecs = newMetricsVecs("profile", []string{"namespace", "profile"})

// NewProfileWatcher creates a ProfileWatcher and begins watching the k8sAPI for
// service profile changes.
func NewProfileWatcher(k8sAPI *k8s.API, log *logging.Entry) *ProfileWatcher {
func NewProfileWatcher(k8sAPI *k8s.API, log *logging.Entry) (*ProfileWatcher, error) {
watcher := &ProfileWatcher{
profileLister: k8sAPI.SP().Lister(),
profiles: make(map[ProfileID]*profilePublisher),
log: log.WithField("component", "profile-watcher"),
}

k8sAPI.SP().Informer().AddEventHandler(
_, err := k8sAPI.SP().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: watcher.addProfile,
UpdateFunc: watcher.updateProfile,
DeleteFunc: watcher.deleteProfile,
},
)
if err != nil {
return nil, err
}

return watcher
return watcher, nil
}

//////////////////////
Expand Down
10 changes: 8 additions & 2 deletions controller/api/destination/watcher/profile_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ func TestProfileWatcherUpdates(t *testing.T) {
t.Fatalf("NewFakeAPI returned an error: %s", err)
}

watcher := NewProfileWatcher(k8sAPI, logging.WithField("test", t.Name()))
watcher, err := NewProfileWatcher(k8sAPI, logging.WithField("test", t.Name()))
if err != nil {
t.Fatalf("can't create profile watcher: %s", err)
}

k8sAPI.Sync(nil)

Expand Down Expand Up @@ -136,7 +139,10 @@ func TestProfileWatcherDeletes(t *testing.T) {
t.Fatalf("NewFakeAPI returned an error: %s", err)
}

watcher := NewProfileWatcher(k8sAPI, logging.WithField("test", t.Name()))
watcher, err := NewProfileWatcher(k8sAPI, logging.WithField("test", t.Name()))
if err != nil {
t.Fatalf("can't create profile watcher: %s", err)
}
k8sAPI.Sync(nil)

listener := NewDeletingProfileListener()
Expand Down
Loading
Loading