Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix memory leak in stats collector #3069

Merged
merged 1 commit into from
Oct 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions agent/dockerclient/dockerapi/docker_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1441,7 +1441,7 @@ func (dg *dockerGoClient) APIVersion() (dockerclient.DockerVersion, error) {
func (dg *dockerGoClient) Stats(ctx context.Context, id string, inactivityTimeout time.Duration) (<-chan *types.StatsJSON, <-chan error) {
subCtx, cancelRequest := context.WithCancel(ctx)

errC := make(chan error)
errC := make(chan error, 1)
statsC := make(chan *types.StatsJSON)
client, err := dg.sdkDockerClient()
if err != nil {
Expand Down Expand Up @@ -1487,7 +1487,12 @@ func (dg *dockerGoClient) Stats(ctx context.Context, id string, inactivityTimeou
return
}

statsC <- data
select {
case <-ctx.Done():
return
case statsC <- data:
}

data = new(types.StatsJSON)
}
}()
Expand All @@ -1504,8 +1509,11 @@ func (dg *dockerGoClient) Stats(ctx context.Context, id string, inactivityTimeou
errC <- err
return
}
statsC <- stats

select {
case <-ctx.Done():
return
case statsC <- stats:
}
// sleeping here jitters the time at which the ticker is created, so that
// containers do not synchronize on calling the docker stats api.
// the max sleep is 80% of the polling interval so that we have a chance to
Expand All @@ -1519,7 +1527,11 @@ func (dg *dockerGoClient) Stats(ctx context.Context, id string, inactivityTimeou
errC <- err
return
}
statsC <- stats
select {
case <-ctx.Done():
return
case statsC <- stats:
}
}
}()
}
Expand Down
16 changes: 14 additions & 2 deletions agent/dockerclient/dockerapi/docker_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1336,6 +1336,12 @@ func TestStatsNormalExit(t *testing.T) {

assert.Equal(t, uint64(50), newStat.MemoryStats.Usage)
assert.Equal(t, uint64(100), newStat.CPUStats.SystemUsage)

// stop container stats
cancel()
// verify stats chan was closed to avoid goroutine leaks
_, ok := <-stats
assert.False(t, ok, "stats channel was not properly closed")
}

func TestStatsErrorReading(t *testing.T) {
Expand All @@ -1349,9 +1355,12 @@ func TestStatsErrorReading(t *testing.T) {
}, errors.New("test error"))
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
_, errC := client.Stats(ctx, "foo", dockerclient.StatsInactivityTimeout)
statsC, errC := client.Stats(ctx, "foo", dockerclient.StatsInactivityTimeout)

assert.Error(t, <-errC)
// verify stats chan was closed to avoid goroutine leaks
_, ok := <-statsC
assert.False(t, ok, "stats channel was not properly closed")
}

func TestStatsErrorDecoding(t *testing.T) {
Expand All @@ -1365,8 +1374,11 @@ func TestStatsErrorDecoding(t *testing.T) {
}, nil)
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
_, errC := client.Stats(ctx, "foo", dockerclient.StatsInactivityTimeout)
statsC, errC := client.Stats(ctx, "foo", dockerclient.StatsInactivityTimeout)
assert.Error(t, <-errC)
// verify stats chan was closed to avoid goroutine leaks
_, ok := <-statsC
assert.False(t, ok, "stats channel was not properly closed")
}

func TestStatsClientError(t *testing.T) {
Expand Down