Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .chloggen/middleware-all.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: configmiddleware

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add configmiddleware, extensionmiddleware, and add support in gRPC and HTTP.

# One or more tracking issues or pull requests related to the change
issues: [12603, 9591]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
2 changes: 2 additions & 0 deletions cmd/otelcorecol/builder-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ replaces:
- go.opentelemetry.io/collector/config/configcompression => ../../config/configcompression
- go.opentelemetry.io/collector/config/configgrpc => ../../config/configgrpc
- go.opentelemetry.io/collector/config/confighttp => ../../config/confighttp
- go.opentelemetry.io/collector/config/configmiddleware => ../../config/configmiddleware
- go.opentelemetry.io/collector/config/confignet => ../../config/confignet
- go.opentelemetry.io/collector/config/configopaque => ../../config/configopaque
- go.opentelemetry.io/collector/config/configretry => ../../config/configretry
Expand Down Expand Up @@ -78,6 +79,7 @@ replaces:
- go.opentelemetry.io/collector/extension => ../../extension
- go.opentelemetry.io/collector/extension/extensionauth => ../../extension/extensionauth
- go.opentelemetry.io/collector/extension/extensionauth/extensionauthtest => ../../extension/extensionauth/extensionauthtest
- go.opentelemetry.io/collector/extension/extensionmiddleware => ../../extension/extensionmiddleware
- go.opentelemetry.io/collector/extension/extensioncapabilities => ../../extension/extensioncapabilities
- go.opentelemetry.io/collector/extension/extensiontest => ../../extension/extensiontest
- go.opentelemetry.io/collector/extension/memorylimiterextension => ../../extension/memorylimiterextension
Expand Down
8 changes: 7 additions & 1 deletion cmd/otelcorecol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ module go.opentelemetry.io/collector/cmd/otelcorecol

go 1.23.0

toolchain go1.23.8
toolchain go1.24.1

require (
go.opentelemetry.io/collector/component v1.30.0
Expand Down Expand Up @@ -89,6 +89,7 @@ require (
go.opentelemetry.io/collector/config/configcompression v1.30.0 // indirect
go.opentelemetry.io/collector/config/configgrpc v0.124.0 // indirect
go.opentelemetry.io/collector/config/confighttp v0.124.0 // indirect
go.opentelemetry.io/collector/config/configmiddleware v0.0.0-00010101000000-000000000000 // indirect
go.opentelemetry.io/collector/config/confignet v1.30.0 // indirect
go.opentelemetry.io/collector/config/configopaque v1.30.0 // indirect
go.opentelemetry.io/collector/config/configretry v1.30.0 // indirect
Expand All @@ -107,6 +108,7 @@ require (
go.opentelemetry.io/collector/exporter/xexporter v0.124.0 // indirect
go.opentelemetry.io/collector/extension/extensionauth v1.30.0 // indirect
go.opentelemetry.io/collector/extension/extensioncapabilities v0.124.0 // indirect
go.opentelemetry.io/collector/extension/extensionmiddleware v0.0.0-00010101000000-000000000000 // indirect
go.opentelemetry.io/collector/extension/extensiontest v0.124.0 // indirect
go.opentelemetry.io/collector/extension/xextension v0.124.0 // indirect
go.opentelemetry.io/collector/featuregate v1.30.0 // indirect
Expand Down Expand Up @@ -185,6 +187,8 @@ replace go.opentelemetry.io/collector/config/configgrpc => ../../config/configgr

replace go.opentelemetry.io/collector/config/confighttp => ../../config/confighttp

replace go.opentelemetry.io/collector/config/configmiddleware => ../../config/configmiddleware

replace go.opentelemetry.io/collector/config/confignet => ../../config/confignet

replace go.opentelemetry.io/collector/config/configopaque => ../../config/configopaque
Expand Down Expand Up @@ -249,6 +253,8 @@ replace go.opentelemetry.io/collector/extension/extensionauth => ../../extension

replace go.opentelemetry.io/collector/extension/extensionauth/extensionauthtest => ../../extension/extensionauth/extensionauthtest

replace go.opentelemetry.io/collector/extension/extensionmiddleware => ../../extension/extensionmiddleware

replace go.opentelemetry.io/collector/extension/extensioncapabilities => ../../extension/extensioncapabilities

replace go.opentelemetry.io/collector/extension/extensiontest => ../../extension/extensiontest
Expand Down
2 changes: 2 additions & 0 deletions config/configgrpc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ README](../configtls/README.md).
- [`read_buffer_size`](https://godoc.org/google.golang.org/grpc#ReadBufferSize)
- [`write_buffer_size`](https://godoc.org/google.golang.org/grpc#WriteBufferSize)
- [`auth`](../configauth/README.md)
- [`middlewares`](../configmiddleware/README.md)

Please note that [`per_rpc_auth`](https://pkg.go.dev/google.golang.org/grpc#PerRPCCredentials) which allows the credentials to send for every RPC is now moved to become an [extension](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/extension/bearertokenauthextension). Note that this feature isn't about sending the headers only during the initial connection as an `authorization` header under the `headers` would do: this is sent for every RPC performed during an established connection.

Expand Down Expand Up @@ -111,3 +112,4 @@ see [confignet README](../confignet/README.md).
- [`tls`](../configtls/README.md)
- [`write_buffer_size`](https://godoc.org/google.golang.org/grpc#WriteBufferSize)
- [`auth`](../configauth/README.md)
- [`middlewares`](../configmiddleware/README.md)
137 changes: 137 additions & 0 deletions config/configgrpc/client_middleware_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package configgrpc

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configmiddleware"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/extension/extensionmiddleware"
)

// TestClientMiddlewareOrdering verifies that client middleware
// interceptors are called in the right order.
func TestClientMiddlewareOrdering(t *testing.T) {
// Create a middleware tracking header that will be modified by our middleware interceptors
const middlewareTrackingHeader = "middleware-sequence"

// Create middleware extensions that will modify the metadata to track their execution order
mockMiddleware1 := &mockClientMiddleware{id: "middleware-1"}
mockMiddleware2 := &mockClientMiddleware{id: "middleware-2"}

mockExt := map[component.ID]component.Component{
component.MustNewID("middleware1"): mockMiddleware1,
component.MustNewID("middleware2"): mockMiddleware2,
}

// Start a gRPC server that will record the incoming metadata
server := &grpcTraceServer{}
srv, addr := server.startTestServer(t, ServerConfig{
NetAddr: confignet.AddrConfig{
Endpoint: "localhost:0",
Transport: confignet.TransportTypeTCP,
},
})
defer srv.Stop()

// Create client config with middleware extensions
clientConfig := ClientConfig{
Endpoint: addr,
TLSSetting: configtls.ClientConfig{
Insecure: true,
},
Middlewares: []configmiddleware.Middleware{
{
MiddlewareID: component.MustNewID("middleware1"),
},
{
MiddlewareID: component.MustNewID("middleware2"),
},
},
}

// Create a test host with our mock extensions
host := &mockHost{ext: mockExt}

// Send a request using the client with middleware
resp, err := sendTestRequestWithHost(t, clientConfig, host)
require.NoError(t, err)
assert.NotNil(t, resp)

// Verify that the middleware order was respected as recorded in the metadata
ictx, ok := metadata.FromIncomingContext(server.recordedContext)
require.True(t, ok, "middleware tracking header not found in metadata")
md := ictx[middlewareTrackingHeader]
require.Len(t, md, 1, "expected exactly one middleware tracking header value")

// The sequence should be "middleware-1,middleware-2" as that's the order they were registered
expectedSequence := "middleware-1,middleware-2"
assert.Equal(t, expectedSequence, md[0])
}

// mockClientMiddleware is a mock implementation of a middleware extension
type mockClientMiddleware struct {
id string
}

var (
_ component.Component = &mockClientMiddleware{}
_ extensionmiddleware.GRPCClient = &mockClientMiddleware{}
)

// Start implements component.Component
func (m *mockClientMiddleware) Start(context.Context, component.Host) error {
return nil
}

// Shutdown implements component.Component
func (m *mockClientMiddleware) Shutdown(context.Context) error {
return nil
}

// UnaryClientInterceptor intercepts unary calls and adds middleware ID to the tracking header
func (m *mockClientMiddleware) GetGRPCClientOptions() ([]grpc.DialOption, error) {
return []grpc.DialOption{grpc.WithChainUnaryInterceptor(func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
// Get existing metadata or create new metadata
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
md = metadata.New(nil)
} else {
// Clone the metadata to avoid modifying the real metadata map
md = md.Copy()
}

// Check if there's already a middleware sequence header
sequence := ""
if values := md.Get("middleware-sequence"); len(values) > 0 {
sequence = values[0]
}

// Append this middleware's ID to the sequence
if sequence == "" {
sequence = m.id
} else {
sequence = fmt.Sprintf("%s,%s", sequence, m.id)
}

// Set the updated sequence
md.Set("middleware-sequence", sequence)

// Create a new context with the updated metadata
newCtx := metadata.NewOutgoingContext(ctx, md)

// Continue the call with our updated context
return invoker(newCtx, method, req, reply, cc, opts...)
})}, nil
}
30 changes: 28 additions & 2 deletions config/configgrpc/configgrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configauth"
"go.opentelemetry.io/collector/config/configcompression"
"go.opentelemetry.io/collector/config/configmiddleware"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/config/configtls"
Expand Down Expand Up @@ -103,6 +104,9 @@ type ClientConfig struct {

// Auth configuration for outgoing RPCs.
Auth *configauth.Authentication `mapstructure:"auth,omitempty"`

// Middlewares for the gRPC client.
Middlewares []configmiddleware.Middleware `mapstructure:"middlewares,omitempty"`
}

// NewDefaultClientConfig returns a new instance of ClientConfig with default values.
Expand Down Expand Up @@ -189,6 +193,9 @@ type ServerConfig struct {

// Include propagates the incoming connection's metadata to downstream consumers.
IncludeMetadata bool `mapstructure:"include_metadata,omitempty"`

// Middlewares for the gRPC server.
Middlewares []configmiddleware.Middleware `mapstructure:"middlewares,omitempty"`
}

// NewDefaultServerConfig returns a new instance of ServerConfig with default values.
Expand Down Expand Up @@ -362,6 +369,15 @@ func (gcs *ClientConfig) getGrpcDialOptions(
)
}

// Apply middleware options. Note: OpenTelemetry could be registered as an extension.
for _, middleware := range gcs.Middlewares {
middlewareOptions, err := middleware.GetGRPCClientOptions(ctx, host.GetExtensions())
if err != nil {
return nil, fmt.Errorf("failed to get gRPC client options from middleware: %w", err)
}
opts = append(opts, middlewareOptions...)
}

for _, opt := range extraOpts {
if wrapper, ok := opt.(grpcDialOptionWrapper); ok {
opts = append(opts, wrapper.opt)
Expand Down Expand Up @@ -404,19 +420,20 @@ func (grpcServerOptionWrapper) isToServerOption() {}

// ToServer returns a [grpc.Server] for the configuration.
func (gss *ServerConfig) ToServer(
_ context.Context,
ctx context.Context,
host component.Host,
settings component.TelemetrySettings,
extraOpts ...ToServerOption,
) (*grpc.Server, error) {
grpcOpts, err := gss.getGrpcServerOptions(host, settings, extraOpts)
grpcOpts, err := gss.getGrpcServerOptions(ctx, host, settings, extraOpts)
if err != nil {
return nil, err
}
return grpc.NewServer(grpcOpts...), nil
}

func (gss *ServerConfig) getGrpcServerOptions(
ctx context.Context,
host component.Host,
settings component.TelemetrySettings,
extraOpts []ToServerOption,
Expand Down Expand Up @@ -505,6 +522,15 @@ func (gss *ServerConfig) getGrpcServerOptions(

opts = append(opts, grpc.StatsHandler(otelgrpc.NewServerHandler(otelOpts...)), grpc.ChainUnaryInterceptor(uInterceptors...), grpc.ChainStreamInterceptor(sInterceptors...))

// Apply middleware options. Note: OpenTelemetry could be registered as an extension.
for _, middleware := range gss.Middlewares {
middlewareOptions, err := middleware.GetGRPCServerOptions(ctx, host.GetExtensions())
if err != nil {
return nil, fmt.Errorf("failed to get gRPC server options from middleware: %w", err)
}
opts = append(opts, middlewareOptions...)
}

for _, opt := range extraOpts {
if wrapper, ok := opt.(grpcServerOptionWrapper); ok {
opts = append(opts, wrapper.opt)
Expand Down
19 changes: 15 additions & 4 deletions config/configgrpc/configgrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func TestDefaultGrpcServerSettings(t *testing.T) {
Endpoint: "0.0.0.0:1234",
},
}
opts, err := gss.getGrpcServerOptions(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings(), []ToServerOption{})
opts, err := gss.getGrpcServerOptions(context.Background(), componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings(), []ToServerOption{})
require.NoError(t, err)
assert.Len(t, opts, 3)
}
Expand All @@ -312,6 +312,7 @@ func TestGrpcServerExtraOption(t *testing.T) {
}
extraOpt := grpc.ConnectionTimeout(1_000_000_000)
opts, err := gss.getGrpcServerOptions(
context.Background(),
componenttest.NewNopHost(),
componenttest.NewNopTelemetrySettings(),
[]ToServerOption{WithGrpcServerOption(extraOpt)},
Expand Down Expand Up @@ -401,7 +402,7 @@ func TestAllGrpcServerSettingsExceptAuth(t *testing.T) {
},
},
}
opts, err := gss.getGrpcServerOptions(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings(), []ToServerOption{})
opts, err := gss.getGrpcServerOptions(context.Background(), componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings(), []ToServerOption{})
require.NoError(t, err)
assert.Len(t, opts, 10)
}
Expand Down Expand Up @@ -1144,9 +1145,13 @@ func (gts *grpcTraceServer) Export(ctx context.Context, _ ptraceotlp.ExportReque
}

func (gts *grpcTraceServer) startTestServer(t *testing.T, gss ServerConfig) (*grpc.Server, string) {
return gts.startTestServerWithHost(t, gss, componenttest.NewNopHost())
}

func (gts *grpcTraceServer) startTestServerWithHost(t *testing.T, gss ServerConfig, host component.Host, opts ...ToServerOption) (*grpc.Server, string) {
listener, err := gss.NetAddr.Listen(context.Background())
require.NoError(t, err)
server, err := gss.ToServer(context.Background(), componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings())
server, err := gss.ToServer(context.Background(), host, componenttest.NewNopTelemetrySettings(), opts...)
require.NoError(t, err)
ptraceotlp.RegisterGRPCServer(server, gts)
go func() {
Expand All @@ -1155,8 +1160,14 @@ func (gts *grpcTraceServer) startTestServer(t *testing.T, gss ServerConfig) (*gr
return server, listener.Addr().String()
}

// sendTestRequest issues a ptraceotlp export request and captures metadata.
func sendTestRequest(t *testing.T, gcs ClientConfig) (ptraceotlp.ExportResponse, error) {
grpcClientConn, errClient := gcs.ToClientConn(context.Background(), componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings())
return sendTestRequestWithHost(t, gcs, componenttest.NewNopHost())
}

// sendTestRequestWithHost is similar to sendTestRequest but allows specifying the host
func sendTestRequestWithHost(t *testing.T, gcs ClientConfig, host component.Host) (ptraceotlp.ExportResponse, error) {
grpcClientConn, errClient := gcs.ToClientConn(context.Background(), host, componenttest.NewNopTelemetrySettings())
require.NoError(t, errClient)
defer func() { assert.NoError(t, grpcClientConn.Close()) }()
c := ptraceotlp.NewGRPCClient(grpcClientConn)
Expand Down
Loading