Skip to content

Commit 1260faf

Browse files
authored
[chore][exporter/googlecloudpubsub] Fix goroutines leak (#36591)
#### Description Fixes goroutines leak by properly closing the underlying gRPC client which is only when we're using an insecure custom endpoint. Enables goleak in tests. #### Link to tracking issue Related to #30438
1 parent 4752d81 commit 1260faf

9 files changed

+242
-62
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
change_type: bug_fix
2+
component: googlecloudpubsubexporter
3+
note: Fix a goroutine leak during shutdown.
4+
issues: [30438]
5+
subtext: |
6+
A goroutine leak was found in the googlecloudpubsubexporter.
7+
The goroutine leak was caused by the exporter not closing the underlying created gRPC client when using an insecure custom endpoint.
8+
change_logs: []

exporter/googlecloudpubsubexporter/exporter.go

+8-30
Original file line numberDiff line numberDiff line change
@@ -10,24 +10,20 @@ import (
1010
"fmt"
1111
"time"
1212

13-
pubsub "cloud.google.com/go/pubsub/apiv1"
1413
"cloud.google.com/go/pubsub/apiv1/pubsubpb"
1514
"github.com/google/uuid"
1615
"go.opentelemetry.io/collector/component"
1716
"go.opentelemetry.io/collector/pdata/plog"
1817
"go.opentelemetry.io/collector/pdata/pmetric"
1918
"go.opentelemetry.io/collector/pdata/ptrace"
2019
"go.uber.org/zap"
21-
"google.golang.org/api/option"
22-
"google.golang.org/grpc"
23-
"google.golang.org/grpc/credentials/insecure"
2420
)
2521

2622
const name = "googlecloudpubsub"
2723

2824
type pubsubExporter struct {
2925
logger *zap.Logger
30-
client *pubsub.PublisherClient
26+
client publisherClient
3127
cancel context.CancelFunc
3228
userAgent string
3329
ceSource string
@@ -71,8 +67,7 @@ func (ex *pubsubExporter) start(ctx context.Context, _ component.Host) error {
7167
ctx, ex.cancel = context.WithCancel(ctx)
7268

7369
if ex.client == nil {
74-
copts := ex.generateClientOptions()
75-
client, err := pubsub.NewPublisherClient(ctx, copts...)
70+
client, err := newPublisherClient(ctx, ex.config, ex.userAgent)
7671
if err != nil {
7772
return fmt.Errorf("failed creating the gRPC client to Pubsub: %w", err)
7873
}
@@ -82,31 +77,14 @@ func (ex *pubsubExporter) start(ctx context.Context, _ component.Host) error {
8277
return nil
8378
}
8479

85-
func (ex *pubsubExporter) shutdown(context.Context) error {
86-
if ex.client != nil {
87-
ex.client.Close()
88-
ex.client = nil
80+
func (ex *pubsubExporter) shutdown(_ context.Context) error {
81+
if ex.client == nil {
82+
return nil
8983
}
90-
return nil
91-
}
9284

93-
func (ex *pubsubExporter) generateClientOptions() (copts []option.ClientOption) {
94-
if ex.userAgent != "" {
95-
copts = append(copts, option.WithUserAgent(ex.userAgent))
96-
}
97-
if ex.config.Endpoint != "" {
98-
if ex.config.Insecure {
99-
var dialOpts []grpc.DialOption
100-
if ex.userAgent != "" {
101-
dialOpts = append(dialOpts, grpc.WithUserAgent(ex.userAgent))
102-
}
103-
conn, _ := grpc.NewClient(ex.config.Endpoint, append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))...)
104-
copts = append(copts, option.WithGRPCConn(conn))
105-
} else {
106-
copts = append(copts, option.WithEndpoint(ex.config.Endpoint))
107-
}
108-
}
109-
return copts
85+
client := ex.client
86+
ex.client = nil
87+
return client.Close()
11088
}
11189

11290
func (ex *pubsubExporter) publishMessage(ctx context.Context, encoding encoding, data []byte, watermark time.Time) error {

exporter/googlecloudpubsubexporter/exporter_test.go

-27
Original file line numberDiff line numberDiff line change
@@ -16,40 +16,13 @@ import (
1616
"go.opentelemetry.io/collector/pdata/plog"
1717
"go.opentelemetry.io/collector/pdata/pmetric"
1818
"go.opentelemetry.io/collector/pdata/ptrace"
19-
"google.golang.org/api/option"
2019
)
2120

2221
func TestName(t *testing.T) {
2322
exporter := &pubsubExporter{}
2423
assert.Equal(t, "googlecloudpubsub", exporter.Name())
2524
}
2625

27-
func TestGenerateClientOptions(t *testing.T) {
28-
// Start a fake server running locally.
29-
srv := pstest.NewServer()
30-
defer srv.Close()
31-
factory := NewFactory()
32-
cfg := factory.CreateDefaultConfig()
33-
exporterConfig := cfg.(*Config)
34-
exporterConfig.Endpoint = srv.Addr
35-
exporterConfig.UserAgent = "test-user-agent"
36-
exporterConfig.Insecure = true
37-
exporterConfig.ProjectID = "my-project"
38-
exporterConfig.Topic = "projects/my-project/topics/otlp"
39-
exporterConfig.TimeoutSettings = exporterhelper.TimeoutConfig{
40-
Timeout: 12 * time.Second,
41-
}
42-
exporter := ensureExporter(exportertest.NewNopSettings(), exporterConfig)
43-
44-
options := exporter.generateClientOptions()
45-
assert.Equal(t, option.WithUserAgent("test-user-agent"), options[0])
46-
47-
exporter.config.Insecure = false
48-
options = exporter.generateClientOptions()
49-
assert.Equal(t, option.WithUserAgent("test-user-agent"), options[0])
50-
assert.Equal(t, option.WithEndpoint(srv.Addr), options[1])
51-
}
52-
5326
func TestExporterDefaultSettings(t *testing.T) {
5427
ctx := context.Background()
5528
// Start a fake server running locally.

exporter/googlecloudpubsubexporter/generated_package_test.go

+3-3
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

exporter/googlecloudpubsubexporter/go.mod

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ go 1.22.0
55
require (
66
cloud.google.com/go/pubsub v1.45.1
77
github.com/google/uuid v1.6.0
8+
github.com/googleapis/gax-go/v2 v2.13.0
89
github.com/stretchr/testify v1.10.0
910
go.opentelemetry.io/collector/component v0.114.1-0.20241202231142-b9ff1bc54c99
1011
go.opentelemetry.io/collector/component/componenttest v0.114.1-0.20241202231142-b9ff1bc54c99
@@ -14,6 +15,7 @@ require (
1415
go.opentelemetry.io/collector/exporter v0.114.1-0.20241202231142-b9ff1bc54c99
1516
go.opentelemetry.io/collector/exporter/exportertest v0.114.1-0.20241202231142-b9ff1bc54c99
1617
go.opentelemetry.io/collector/pdata v1.20.1-0.20241202231142-b9ff1bc54c99
18+
go.uber.org/goleak v1.3.0
1719
go.uber.org/zap v1.27.0
1820
google.golang.org/api v0.205.0
1921
google.golang.org/grpc v1.67.1
@@ -36,7 +38,6 @@ require (
3638
github.com/google/go-cmp v0.6.0 // indirect
3739
github.com/google/s2a-go v0.1.8 // indirect
3840
github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect
39-
github.com/googleapis/gax-go/v2 v2.13.0 // indirect
4041
github.com/hashicorp/go-version v1.7.0 // indirect
4142
github.com/json-iterator/go v1.1.12 // indirect
4243
github.com/knadh/koanf/maps v0.1.1 // indirect

exporter/googlecloudpubsubexporter/metadata.yaml

+4-1
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,7 @@ status:
1212
tests:
1313
skip_lifecycle: true
1414
goleak:
15-
skip: true
15+
ignore:
16+
top:
17+
# See https://github.com/census-instrumentation/opencensus-go/issues/1191 for more information.
18+
- "go.opencensus.io/stats/view.(*worker).start"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package googlecloudpubsubexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/googlecloudpubsubexporter"
5+
6+
import (
7+
"context"
8+
"fmt"
9+
10+
pubsub "cloud.google.com/go/pubsub/apiv1"
11+
"cloud.google.com/go/pubsub/apiv1/pubsubpb"
12+
"github.com/googleapis/gax-go/v2"
13+
"google.golang.org/api/option"
14+
"google.golang.org/grpc"
15+
"google.golang.org/grpc/credentials/insecure"
16+
)
17+
18+
// publisherClient subset of `pubsub.PublisherClient`
19+
type publisherClient interface {
20+
Close() error
21+
Publish(ctx context.Context, req *pubsubpb.PublishRequest, opts ...gax.CallOption) (*pubsubpb.PublishResponse, error)
22+
}
23+
24+
// wrappedPublisherClient allows to override the close function
25+
type wrappedPublisherClient struct {
26+
publisherClient
27+
closeFn func() error
28+
}
29+
30+
func (c *wrappedPublisherClient) Close() error {
31+
if c.closeFn != nil {
32+
return c.closeFn()
33+
}
34+
return c.publisherClient.Close()
35+
}
36+
37+
func newPublisherClient(ctx context.Context, config *Config, userAgent string) (publisherClient, error) {
38+
clientOptions, closeFn, err := generateClientOptions(config, userAgent)
39+
if err != nil {
40+
return nil, fmt.Errorf("failed preparing the gRPC client options to PubSub: %w", err)
41+
}
42+
43+
client, err := pubsub.NewPublisherClient(ctx, clientOptions...)
44+
if err != nil {
45+
return nil, fmt.Errorf("failed creating the gRPC client to PubSub: %w", err)
46+
}
47+
48+
if closeFn == nil {
49+
return client, nil
50+
}
51+
52+
return &wrappedPublisherClient{
53+
publisherClient: client,
54+
closeFn: closeFn,
55+
}, nil
56+
}
57+
58+
func generateClientOptions(config *Config, userAgent string) ([]option.ClientOption, func() error, error) {
59+
var copts []option.ClientOption
60+
var closeFn func() error
61+
62+
if userAgent != "" {
63+
copts = append(copts, option.WithUserAgent(userAgent))
64+
}
65+
if config.Endpoint != "" {
66+
if config.Insecure {
67+
var dialOpts []grpc.DialOption
68+
if userAgent != "" {
69+
dialOpts = append(dialOpts, grpc.WithUserAgent(userAgent))
70+
}
71+
client, err := grpc.NewClient(config.Endpoint, append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))...)
72+
if err != nil {
73+
return nil, nil, err
74+
}
75+
copts = append(copts, option.WithGRPCConn(client))
76+
closeFn = client.Close // we need to be able to properly close the grpc client otherwise it'll leak goroutines
77+
} else {
78+
copts = append(copts, option.WithEndpoint(config.Endpoint))
79+
}
80+
}
81+
return copts, closeFn, nil
82+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package googlecloudpubsubexporter
5+
6+
import (
7+
"context"
8+
"testing"
9+
10+
pubsub "cloud.google.com/go/pubsub/apiv1"
11+
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/require"
13+
"google.golang.org/api/option"
14+
)
15+
16+
func TestGenerateClientOptions(t *testing.T) {
17+
factory := NewFactory()
18+
19+
t.Run("defaults", func(t *testing.T) {
20+
cfg := factory.CreateDefaultConfig().(*Config)
21+
cfg.ProjectID = "my-project"
22+
cfg.Topic = "projects/my-project/topics/otlp"
23+
24+
require.NoError(t, cfg.Validate())
25+
26+
gotOptions, closeConnFn, err := generateClientOptions(cfg, "test-user-agent 6789")
27+
assert.NoError(t, err)
28+
assert.Empty(t, closeConnFn)
29+
30+
expectedOptions := []option.ClientOption{
31+
option.WithUserAgent("test-user-agent 6789"),
32+
}
33+
assert.ElementsMatch(t, expectedOptions, gotOptions)
34+
})
35+
36+
t.Run("secure custom endpoint", func(t *testing.T) {
37+
cfg := factory.CreateDefaultConfig().(*Config)
38+
cfg.ProjectID = "my-project"
39+
cfg.Topic = "projects/my-project/topics/otlp"
40+
cfg.Endpoint = "defg"
41+
42+
require.NoError(t, cfg.Validate())
43+
44+
gotOptions, closeConnFn, err := generateClientOptions(cfg, "test-user-agent 4321")
45+
assert.NoError(t, err)
46+
assert.Empty(t, closeConnFn)
47+
48+
expectedOptions := []option.ClientOption{
49+
option.WithUserAgent("test-user-agent 4321"),
50+
option.WithEndpoint("defg"),
51+
}
52+
assert.ElementsMatch(t, expectedOptions, gotOptions)
53+
})
54+
55+
t.Run("insecure endpoint", func(t *testing.T) {
56+
cfg := factory.CreateDefaultConfig().(*Config)
57+
cfg.ProjectID = "my-project"
58+
cfg.Topic = "projects/my-project/topics/otlp"
59+
cfg.Endpoint = "abcd"
60+
cfg.Insecure = true
61+
62+
require.NoError(t, cfg.Validate())
63+
64+
gotOptions, closeConnFn, err := generateClientOptions(cfg, "test-user-agent 1234")
65+
assert.NoError(t, err)
66+
assert.NotEmpty(t, closeConnFn)
67+
assert.NoError(t, closeConnFn())
68+
69+
require.Len(t, gotOptions, 2)
70+
assert.Equal(t, option.WithUserAgent("test-user-agent 1234"), gotOptions[0])
71+
assert.IsType(t, option.WithGRPCConn(nil), gotOptions[1])
72+
})
73+
}
74+
75+
func TestNewPublisherClient(t *testing.T) {
76+
// The publisher client checks for credentials during init
77+
t.Setenv("GOOGLE_APPLICATION_CREDENTIALS", "testdata/gcp-fake-creds.json")
78+
79+
ctx := context.Background()
80+
factory := NewFactory()
81+
82+
t.Run("defaults", func(t *testing.T) {
83+
cfg := factory.CreateDefaultConfig().(*Config)
84+
cfg.ProjectID = "my-project"
85+
cfg.Topic = "projects/my-project/topics/otlp"
86+
87+
require.NoError(t, cfg.Validate())
88+
89+
client, err := newPublisherClient(ctx, cfg, "test-user-agent 6789")
90+
assert.NoError(t, err)
91+
require.NotEmpty(t, client)
92+
assert.IsType(t, &pubsub.PublisherClient{}, client)
93+
assert.NoError(t, client.Close())
94+
})
95+
96+
t.Run("secure custom endpoint", func(t *testing.T) {
97+
cfg := factory.CreateDefaultConfig().(*Config)
98+
cfg.ProjectID = "my-project"
99+
cfg.Topic = "projects/my-project/topics/otlp"
100+
cfg.Endpoint = "xyz"
101+
102+
require.NoError(t, cfg.Validate())
103+
104+
client, err := newPublisherClient(ctx, cfg, "test-user-agent 6789")
105+
assert.NoError(t, err)
106+
require.NotEmpty(t, client)
107+
assert.IsType(t, &pubsub.PublisherClient{}, client)
108+
assert.NoError(t, client.Close())
109+
})
110+
111+
t.Run("insecure endpoint", func(t *testing.T) {
112+
cfg := factory.CreateDefaultConfig().(*Config)
113+
cfg.ProjectID = "my-project"
114+
cfg.Topic = "projects/my-project/topics/otlp"
115+
cfg.Endpoint = "abc"
116+
cfg.Insecure = true
117+
118+
require.NoError(t, cfg.Validate())
119+
120+
client, err := newPublisherClient(ctx, cfg, "test-user-agent 6789")
121+
assert.NoError(t, err)
122+
require.NotEmpty(t, client)
123+
assert.IsType(t, &wrappedPublisherClient{}, client)
124+
assert.NoError(t, client.Close())
125+
})
126+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
{
2+
"type": "service_account",
3+
"private_key_id": "abc",
4+
"private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDY3E8o1NEFcjMM\nHW/5ZfFJw29/8NEqpViNjQIx95Xx5KDtJ+nWn9+OW0uqsSqKlKGhAdAo+Q6bjx2c\nuXVsXTu7XrZUY5Kltvj94DvUa1wjNXs606r/RxWTJ58bfdC+gLLxBfGnB6CwK0YQ\nxnfpjNbkUfVVzO0MQD7UP0Hl5ZcY0Puvxd/yHuONQn/rIAieTHH1pqgW+zrH/y3c\n59IGThC9PPtugI9ea8RSnVj3PWz1bX2UkCDpy9IRh9LzJLaYYX9RUd7++dULUlat\nAaXBh1U6emUDzhrIsgApjDVtimOPbmQWmX1S60mqQikRpVYZ8u+NDD+LNw+/Eovn\nxCj2Y3z1AgMBAAECggEAWDBzoqO1IvVXjBA2lqId10T6hXmN3j1ifyH+aAqK+FVl\nGjyWjDj0xWQcJ9ync7bQ6fSeTeNGzP0M6kzDU1+w6FgyZqwdmXWI2VmEizRjwk+/\n/uLQUcL7I55Dxn7KUoZs/rZPmQDxmGLoue60Gg6z3yLzVcKiDc7cnhzhdBgDc8vd\nQorNAlqGPRnm3EqKQ6VQp6fyQmCAxrr45kspRXNLddat3AMsuqImDkqGKBmF3Q1y\nxWGe81LphUiRqvqbyUlh6cdSZ8pLBpc9m0c3qWPKs9paqBIvgUPlvOZMqec6x4S6\nChbdkkTRLnbsRr0Yg/nDeEPlkhRBhasXpxpMUBgPywKBgQDs2axNkFjbU94uXvd5\nznUhDVxPFBuxyUHtsJNqW4p/ujLNimGet5E/YthCnQeC2P3Ym7c3fiz68amM6hiA\nOnW7HYPZ+jKFnefpAtjyOOs46AkftEg07T9XjwWNPt8+8l0DYawPoJgbM5iE0L2O\nx8TU1Vs4mXc+ql9F90GzI0x3VwKBgQDqZOOqWw3hTnNT07Ixqnmd3dugV9S7eW6o\nU9OoUgJB4rYTpG+yFqNqbRT8bkx37iKBMEReppqonOqGm4wtuRR6LSLlgcIU9Iwx\nyfH12UWqVmFSHsgZFqM/cK3wGev38h1WBIOx3/djKn7BdlKVh8kWyx6uC8bmV+E6\nOoK0vJD6kwKBgHAySOnROBZlqzkiKW8c+uU2VATtzJSydrWm0J4wUPJifNBa/hVW\ndcqmAzXC9xznt5AVa3wxHBOfyKaE+ig8CSsjNyNZ3vbmr0X04FoV1m91k2TeXNod\njMTobkPThaNm4eLJMN2SQJuaHGTGERWC0l3T18t+/zrDMDCPiSLX1NAvAoGBAN1T\nVLJYdjvIMxf1bm59VYcepbK7HLHFkRq6xMJMZbtG0ryraZjUzYvB4q4VjHk2UDiC\nlhx13tXWDZH7MJtABzjyg+AI7XWSEQs2cBXACos0M4Myc6lU+eL+iA+OuoUOhmrh\nqmT8YYGu76/IBWUSqWuvcpHPpwl7871i4Ga/I3qnAoGBANNkKAcMoeAbJQK7a/Rn\nwPEJB+dPgNDIaboAsh1nZhVhN5cvdvCWuEYgOGCPQLYQF0zmTLcM+sVxOYgfy8mV\nfbNgPgsP5xmu6dw2COBKdtozw0HrWSRjACd1N4yGu75+wPCcX/gQarcjRcXXZeEa\nNtBLSfcqPULqD+h7br9lEJio\n-----END PRIVATE KEY-----\n",
5+
"client_email": "[email protected]",
6+
"client_id": "123-abc.apps.googleusercontent.com",
7+
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
8+
"token_uri": "http://localhost:8080/token"
9+
}

0 commit comments

Comments
 (0)