Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .chloggen/middleware-grpc.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: configgrpc

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add gRPC middleware support.

# One or more tracking issues or pull requests related to the change
issues: [12603, 9591]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
2 changes: 2 additions & 0 deletions config/configgrpc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ README](../configtls/README.md).
- [`read_buffer_size`](https://godoc.org/google.golang.org/grpc#ReadBufferSize)
- [`write_buffer_size`](https://godoc.org/google.golang.org/grpc#WriteBufferSize)
- [`auth`](../configauth/README.md)
- [`middlewares`](../configmiddleware/README.md)

Please note that [`per_rpc_auth`](https://pkg.go.dev/google.golang.org/grpc#PerRPCCredentials) which allows the credentials to send for every RPC is now moved to become an [extension](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/extension/bearertokenauthextension). Note that this feature isn't about sending the headers only during the initial connection as an `authorization` header under the `headers` would do: this is sent for every RPC performed during an established connection.

Expand Down Expand Up @@ -111,3 +112,4 @@ see [confignet README](../confignet/README.md).
- [`tls`](../configtls/README.md)
- [`write_buffer_size`](https://godoc.org/google.golang.org/grpc#WriteBufferSize)
- [`auth`](../configauth/README.md)
- [`middlewares`](../configmiddleware/README.md)
202 changes: 202 additions & 0 deletions config/configgrpc/client_middleware_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package configgrpc

import (
"context"
"errors"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configmiddleware"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/extension/extensionmiddleware"
"go.opentelemetry.io/collector/extension/extensionmiddleware/extensionmiddlewaretest"
)

// testlientMiddleware is a mock implementation of a middleware extension
type testClientMiddleware struct {
extension.Extension
extensionmiddleware.GetGRPCClientOptionsFunc
}

func newTestMiddlewareConfig(name string) configmiddleware.Config {
return configmiddleware.Config{
ID: component.MustNewID(name),
}
}

func newTestClientMiddleware(name string) extension.Extension {
return &testClientMiddleware{
Extension: extensionmiddlewaretest.NewNop(),
GetGRPCClientOptionsFunc: func() ([]grpc.DialOption, error) {
return []grpc.DialOption{
grpc.WithChainUnaryInterceptor(
func(
ctx context.Context,
method string,
req, reply any,
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
// Get existing metadata or create new metadata
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
md = metadata.New(nil)
} else {
// Clone the metadata to avoid modifying the real metadata map
md = md.Copy()
}

// Check if there's already a middleware sequence header
sequence := ""
if values := md.Get("middleware-sequence"); len(values) > 0 {
sequence = values[0]
}

// Append this middleware's ID to the sequence
if sequence == "" {
sequence = name
} else {
sequence = fmt.Sprintf("%s,%s", sequence, name)
}

// Set the updated sequence
md.Set("middleware-sequence", sequence)

// Create a new context with the updated metadata
newCtx := metadata.NewOutgoingContext(ctx, md)

// Continue the call with our updated context
return invoker(newCtx, method, req, reply, cc, opts...)
}),
}, nil
},
}
}

// TestClientMiddlewareOrdering verifies that client middleware
// interceptors are called in the right order.
func TestClientMiddlewareOrdering(t *testing.T) {
// Create a middleware tracking header that will be modified by our middleware interceptors
const middlewareTrackingHeader = "middleware-sequence"

// Create middleware extensions that will modify the metadata to track their execution order
mockMiddleware1 := newTestClientMiddleware("middleware-1")
mockMiddleware2 := newTestClientMiddleware("middleware-2")

mockExt := map[component.ID]component.Component{
component.MustNewID("middleware1"): mockMiddleware1,
component.MustNewID("middleware2"): mockMiddleware2,
}

// Start a gRPC server that will record the incoming metadata
server := &grpcTraceServer{}
srv, addr := server.startTestServer(t, ServerConfig{
NetAddr: confignet.AddrConfig{
Endpoint: "localhost:0",
Transport: confignet.TransportTypeTCP,
},
})
defer srv.Stop()

// Create client config with middleware extensions
clientConfig := ClientConfig{
Endpoint: addr,
TLSSetting: configtls.ClientConfig{
Insecure: true,
},
Middlewares: []configmiddleware.Config{
newTestMiddlewareConfig("middleware1"),
newTestMiddlewareConfig("middleware2"),
},
}

// Create a test host with our mock extensions
host := &mockHost{ext: mockExt}

// Send a request using the client with middleware
resp, err := sendTestRequestWithHost(t, clientConfig, host)
require.NoError(t, err)
assert.NotNil(t, resp)

// Verify that the middleware order was respected as recorded in the metadata
ictx, ok := metadata.FromIncomingContext(server.recordedContext)
require.True(t, ok, "middleware tracking header not found in metadata")
md := ictx[middlewareTrackingHeader]
require.Len(t, md, 1, "expected exactly one middleware tracking header value")

// The sequence should be "middleware-1,middleware-2" as that's the order they were registered
expectedSequence := "middleware-1,middleware-2"
assert.Equal(t, expectedSequence, md[0])
}

// TestClientMiddlewareToClientErrors tests failure cases for the ToClient method
// specifically related to middleware resolution and API calls.
func TestClientMiddlewareToClientErrors(t *testing.T) {
tests := []struct {
name string
host component.Host
config ClientConfig
errText string
}{
{
name: "extension_not_found",
host: &mockHost{
ext: map[component.ID]component.Component{},
},
config: ClientConfig{
Endpoint: "localhost:1234",
TLSSetting: configtls.ClientConfig{
Insecure: true,
},
Middlewares: []configmiddleware.Config{
{
ID: component.MustNewID("nonexistent"),
},
},
},
errText: "failed to resolve middleware \"nonexistent\": middleware not found",
},
{
name: "get_client_options_fails",
host: &mockHost{
ext: map[component.ID]component.Component{
component.MustNewID("errormw"): extensionmiddlewaretest.NewErr(errors.New("get options failed")),
},
},
config: ClientConfig{
Endpoint: "localhost:1234",
TLSSetting: configtls.ClientConfig{
Insecure: true,
},
Middlewares: []configmiddleware.Config{
{
ID: component.MustNewID("errormw"),
},
},
},
errText: "get options failed",
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
// Test creating the client with middleware errors
_, err := tc.config.ToClientConn(context.Background(), tc.host, componenttest.NewNopTelemetrySettings())
require.Error(t, err)
assert.Contains(t, err.Error(), tc.errText)
})
}
}
31 changes: 29 additions & 2 deletions config/configgrpc/configgrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configauth"
"go.opentelemetry.io/collector/config/configcompression"
"go.opentelemetry.io/collector/config/configmiddleware"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/config/configtls"
Expand Down Expand Up @@ -105,6 +106,9 @@ type ClientConfig struct {

// Auth configuration for outgoing RPCs.
Auth *configauth.Authentication `mapstructure:"auth,omitempty"`

// Middlewares for the gRPC client.
Middlewares []configmiddleware.Config `mapstructure:"middlewares,omitempty"`
}

// NewDefaultClientConfig returns a new instance of ClientConfig with default values.
Expand Down Expand Up @@ -197,6 +201,10 @@ type ServerConfig struct {

// Include propagates the incoming connection's metadata to downstream consumers.
IncludeMetadata bool `mapstructure:"include_metadata,omitempty"`

// Middlewares for the gRPC server.
Middlewares []configmiddleware.Config `mapstructure:"middlewares,omitempty"`

// prevent unkeyed literal initialization
_ struct{}
}
Expand Down Expand Up @@ -372,6 +380,15 @@ func (gcs *ClientConfig) getGrpcDialOptions(
)
}

// Apply middleware options. Note: OpenTelemetry could be registered as an extension.
for _, middleware := range gcs.Middlewares {
middlewareOptions, err := middleware.GetGRPCClientOptions(ctx, host.GetExtensions())
if err != nil {
return nil, fmt.Errorf("failed to get gRPC client options from middleware: %w", err)
}
opts = append(opts, middlewareOptions...)
}

for _, opt := range extraOpts {
if wrapper, ok := opt.(grpcDialOptionWrapper); ok {
opts = append(opts, wrapper.opt)
Expand Down Expand Up @@ -414,19 +431,20 @@ func (grpcServerOptionWrapper) isToServerOption() {}

// ToServer returns a [grpc.Server] for the configuration.
func (gss *ServerConfig) ToServer(
_ context.Context,
ctx context.Context,
host component.Host,
settings component.TelemetrySettings,
extraOpts ...ToServerOption,
) (*grpc.Server, error) {
grpcOpts, err := gss.getGrpcServerOptions(host, settings, extraOpts)
grpcOpts, err := gss.getGrpcServerOptions(ctx, host, settings, extraOpts)
if err != nil {
return nil, err
}
return grpc.NewServer(grpcOpts...), nil
}

func (gss *ServerConfig) getGrpcServerOptions(
ctx context.Context,
host component.Host,
settings component.TelemetrySettings,
extraOpts []ToServerOption,
Expand Down Expand Up @@ -515,6 +533,15 @@ func (gss *ServerConfig) getGrpcServerOptions(

opts = append(opts, grpc.StatsHandler(otelgrpc.NewServerHandler(otelOpts...)), grpc.ChainUnaryInterceptor(uInterceptors...), grpc.ChainStreamInterceptor(sInterceptors...))

// Apply middleware options. Note: OpenTelemetry could be registered as an extension.
for _, middleware := range gss.Middlewares {
middlewareOptions, err := middleware.GetGRPCServerOptions(ctx, host.GetExtensions())
if err != nil {
return nil, fmt.Errorf("failed to get gRPC server options from middleware: %w", err)
}
opts = append(opts, middlewareOptions...)
}

for _, opt := range extraOpts {
if wrapper, ok := opt.(grpcServerOptionWrapper); ok {
opts = append(opts, wrapper.opt)
Expand Down
35 changes: 31 additions & 4 deletions config/configgrpc/configgrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func TestDefaultGrpcServerSettings(t *testing.T) {
Endpoint: "0.0.0.0:1234",
},
}
opts, err := gss.getGrpcServerOptions(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings(), []ToServerOption{})
opts, err := gss.getGrpcServerOptions(context.Background(), componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings(), []ToServerOption{})
require.NoError(t, err)
assert.Len(t, opts, 3)
}
Expand All @@ -312,6 +312,7 @@ func TestGrpcServerExtraOption(t *testing.T) {
}
extraOpt := grpc.ConnectionTimeout(1_000_000_000)
opts, err := gss.getGrpcServerOptions(
context.Background(),
componenttest.NewNopHost(),
componenttest.NewNopTelemetrySettings(),
[]ToServerOption{WithGrpcServerOption(extraOpt)},
Expand Down Expand Up @@ -401,7 +402,7 @@ func TestAllGrpcServerSettingsExceptAuth(t *testing.T) {
},
},
}
opts, err := gss.getGrpcServerOptions(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings(), []ToServerOption{})
opts, err := gss.getGrpcServerOptions(context.Background(), componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings(), []ToServerOption{})
require.NoError(t, err)
assert.Len(t, opts, 10)
}
Expand Down Expand Up @@ -1144,9 +1145,13 @@ func (gts *grpcTraceServer) Export(ctx context.Context, _ ptraceotlp.ExportReque
}

func (gts *grpcTraceServer) startTestServer(t *testing.T, gss ServerConfig) (*grpc.Server, string) {
return gts.startTestServerWithHost(t, gss, componenttest.NewNopHost())
}

func (gts *grpcTraceServer) startTestServerWithHost(t *testing.T, gss ServerConfig, host component.Host, opts ...ToServerOption) (*grpc.Server, string) {
listener, err := gss.NetAddr.Listen(context.Background())
require.NoError(t, err)
server, err := gss.ToServer(context.Background(), componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings())
server, err := gss.ToServer(context.Background(), host, componenttest.NewNopTelemetrySettings(), opts...)
require.NoError(t, err)
ptraceotlp.RegisterGRPCServer(server, gts)
go func() {
Expand All @@ -1155,8 +1160,30 @@ func (gts *grpcTraceServer) startTestServer(t *testing.T, gss ServerConfig) (*gr
return server, listener.Addr().String()
}

func (gts *grpcTraceServer) startTestServerWithHostError(_ *testing.T, gss ServerConfig, host component.Host, opts ...ToServerOption) (*grpc.Server, error) {
listener, err := gss.NetAddr.Listen(context.Background())
if err != nil {
return nil, err
}
defer listener.Close()

server, err := gss.ToServer(context.Background(), host, componenttest.NewNopTelemetrySettings(), opts...)
if err != nil {
return nil, err
}

ptraceotlp.RegisterGRPCServer(server, gts)
return server, nil
}

// sendTestRequest issues a ptraceotlp export request and captures metadata.
func sendTestRequest(t *testing.T, gcs ClientConfig) (ptraceotlp.ExportResponse, error) {
grpcClientConn, errClient := gcs.ToClientConn(context.Background(), componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings())
return sendTestRequestWithHost(t, gcs, componenttest.NewNopHost())
}

// sendTestRequestWithHost is similar to sendTestRequest but allows specifying the host
func sendTestRequestWithHost(t *testing.T, gcs ClientConfig, host component.Host) (ptraceotlp.ExportResponse, error) {
grpcClientConn, errClient := gcs.ToClientConn(context.Background(), host, componenttest.NewNopTelemetrySettings())
require.NoError(t, errClient)
defer func() { assert.NoError(t, grpcClientConn.Close()) }()
c := ptraceotlp.NewGRPCClient(grpcClientConn)
Expand Down
Loading
Loading