diff --git a/config/configgrpc/configgrpc.go b/config/configgrpc/configgrpc.go index 2947f2f3820..92e7d5a0ddd 100644 --- a/config/configgrpc/configgrpc.go +++ b/config/configgrpc/configgrpc.go @@ -31,10 +31,12 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configauth" "go.opentelemetry.io/collector/config/configcompression" + "go.opentelemetry.io/collector/config/configlimiter" "go.opentelemetry.io/collector/config/confignet" "go.opentelemetry.io/collector/config/configopaque" "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/extension/extensionauth" + "go.opentelemetry.io/collector/extension/xextension/limiter" ) var errMetadataNotFound = errors.New("no request metadata found") @@ -187,6 +189,11 @@ type ServerConfig struct { // Auth for this receiver Auth *configauth.Authentication `mapstructure:"auth,omitempty"` + // Limiters are a collection of limiter extensions. Each + // Limitation names an extension that is expected to implement + // limiter.Extension. They are called in order. + Limiters []configlimiter.Limitation `mapreduce:"limiters"` + // Include propagates the incoming connection's metadata to downstream consumers. IncludeMetadata bool `mapstructure:"include_metadata,omitempty"` } @@ -478,6 +485,8 @@ func (gss *ServerConfig) getGrpcServerOptions( var uInterceptors []grpc.UnaryServerInterceptor var sInterceptors []grpc.StreamServerInterceptor + // Initialize the auth extension first. + if gss.Auth != nil { authenticator, err := gss.Auth.GetServerAuthenticator(context.Background(), host.GetExtensions()) if err != nil { @@ -492,18 +501,46 @@ func (gss *ServerConfig) getGrpcServerOptions( }) } + // Apply client metadata. + + uInterceptors = append(uInterceptors, enhanceWithClientInformation(gss.IncludeMetadata)) + sInterceptors = append(sInterceptors, enhanceStreamWithClientInformation(gss.IncludeMetadata)) + + // Apply limiter extensions (which see client metadata). + + var limitExts []limiter.Limiter + for _, named := range gss.Limiters { + lim, err := named.GetLimiter(context.Background(), host.GetExtensions()) + if err != nil { + return nil, err + } + limitExts = append(limitExts, lim) + } + if limitExts != nil { + uInterceptors = append(uInterceptors, func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) { + return applyUnaryLimiters(ctx, req, info, handler, limitExts) + }) + sInterceptors = append(sInterceptors, func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + return applyStreamLimiters(srv, ss, info, handler, limitExts) + }) + } + + // Enable OpenTelemetry observability plugin. + otelOpts := []otelgrpc.Option{ otelgrpc.WithTracerProvider(settings.TracerProvider), otelgrpc.WithPropagators(otel.GetTextMapPropagator()), otelgrpc.WithMeterProvider(settings.MeterProvider), } - // Enable OpenTelemetry observability plugin. + // Combine the interceptors, the observability plugin, with + // user-provided gRPC options. - uInterceptors = append(uInterceptors, enhanceWithClientInformation(gss.IncludeMetadata)) - sInterceptors = append(sInterceptors, enhanceStreamWithClientInformation(gss.IncludeMetadata)) - - opts = append(opts, grpc.StatsHandler(otelgrpc.NewServerHandler(otelOpts...)), grpc.ChainUnaryInterceptor(uInterceptors...), grpc.ChainStreamInterceptor(sInterceptors...)) + opts = append(opts, + grpc.StatsHandler(otelgrpc.NewServerHandler(otelOpts...)), + grpc.ChainUnaryInterceptor(uInterceptors...), + grpc.ChainStreamInterceptor(sInterceptors...), + ) for _, opt := range extraOpts { if wrapper, ok := opt.(grpcServerOptionWrapper); ok { @@ -589,3 +626,61 @@ func authStreamServerInterceptor(srv any, stream grpc.ServerStream, _ *grpc.Stre return handler(srv, wrapServerStream(ctx, stream)) } + +func applyUnaryLimiters(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler, limiters []limiter.Limiter) (any, error) { + sz, err := sizeReq(req) + if err != nil { + return nil, err + } + + for _, lim := range limiters { + rel, err := lim.Acquire(ctx, sz) + if err != nil { + return nil, err + } + defer rel() + } + + return handler(ctx, req) +} + +func applyStreamLimiters(srv any, stream grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler, limiters []limiter.Limiter) error { + return handler(srv, limitServerStream(stream, limiters)) +} + +func sizeReq(req any) (uint64, error) { + switch treq := req.(type) { + case []byte: + return uint64(len(treq)), nil + default: + return 0, fmt.Errorf("limiter cannot determine size: %T", treq) + } +} + +// limitedServerStream is a thin wrapper around grpc.ServerStream that calls limiters. +type limitedServerStream struct { + grpc.ServerStream + + limiters []limiter.Limiter +} + +// limitServerStream returns a ServerStream that will call limiters. +func limitServerStream(stream grpc.ServerStream, limiters []limiter.Limiter) *limitedServerStream { + return &limitedServerStream{ServerStream: stream, limiters: limiters} +} + +func (ls *limitedServerStream) RecvMsg(req any) error { + sz, err := sizeReq(req) + if err != nil { + return err + } + for _, lim := range ls.limiters { + rel, err := lim.Acquire(ls.Context(), sz) + if err != nil { + return err + } + defer rel() + } + + return ls.ServerStream.RecvMsg(req) +} diff --git a/config/configgrpc/go.mod b/config/configgrpc/go.mod index e2695c86acb..45654658e69 100644 --- a/config/configgrpc/go.mod +++ b/config/configgrpc/go.mod @@ -10,10 +10,12 @@ require ( go.opentelemetry.io/collector/component/componenttest v0.121.0 go.opentelemetry.io/collector/config/configauth v0.121.0 go.opentelemetry.io/collector/config/configcompression v1.27.0 + go.opentelemetry.io/collector/config/configlimiter v0.0.0-00010101000000-000000000000 go.opentelemetry.io/collector/config/confignet v1.27.0 go.opentelemetry.io/collector/config/configopaque v1.27.0 go.opentelemetry.io/collector/config/configtls v1.27.0 go.opentelemetry.io/collector/extension/extensionauth v0.121.0 + go.opentelemetry.io/collector/extension/xextension v0.0.0-00010101000000-000000000000 go.opentelemetry.io/collector/pdata v1.27.0 go.opentelemetry.io/collector/pdata/testdata v0.121.0 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.60.0 @@ -79,3 +81,7 @@ replace go.opentelemetry.io/collector/component => ../../component replace go.opentelemetry.io/collector/component/componenttest => ../../component/componenttest replace go.opentelemetry.io/collector/consumer => ../../consumer + +replace go.opentelemetry.io/collector/extension/xextension => ../../extension/xextension + +replace go.opentelemetry.io/collector/config/configlimiter => ../configlimiter diff --git a/config/configlimiter/Makefile b/config/configlimiter/Makefile new file mode 100644 index 00000000000..ded7a36092d --- /dev/null +++ b/config/configlimiter/Makefile @@ -0,0 +1 @@ +include ../../Makefile.Common diff --git a/config/configlimiter/README.md b/config/configlimiter/README.md new file mode 100644 index 00000000000..6ed13bcd8cd --- /dev/null +++ b/config/configlimiter/README.md @@ -0,0 +1,45 @@ +# Limiter configuration + +This module defines necessary interfaces to implement limiter +extensions. Limiters are included in the basic HTTP and gRPC Server +Config structs, so users will rarely create interact with extensions +these directly. + +To imlpement a limiter extension, components should implement the +`GetClient` interface in +[`extension/xextension/limit`](#../../extension/xextension/limit/README.md). + +The currently known limiter extensions are listed below. + +## Limiter implementations + +- [Memory Limiter Extension](../../extension/memorylimiterextension/README.md) +- [Admission Limiter Extension](../../extension/admissionlimiterextension/README.md) + + +Example: + +```yaml +extensions: + # Used with gRPC traffic, consults GC statistics. + memory_limiter/cold + request_limit_mib: 100 + waiting_limit_mib: 10 + + # Used with HTTP traffic, counts request bytes in flight. + admission_limiter/warm: + request_limit_mib: 10 + waiting_limit_mib: 10 + +receivers: + otlp: + protocols: + http: + # ... + limiter: + - admission_limiter/warm + grpc: + # ... + limiter: + - memory_limiter/cold +``` diff --git a/config/configlimiter/configlimiter.go b/config/configlimiter/configlimiter.go new file mode 100644 index 00000000000..ec8e682171f --- /dev/null +++ b/config/configlimiter/configlimiter.go @@ -0,0 +1,41 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package configlimiter implements the configuration settings to +// perform admission control on byte-weighted requests. +package configlimiter // import "go.opentelemetry.io/collector/config/configlimiter" + +import ( + "context" + "errors" + "fmt" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/extension/xextension/limiter" +) + +var ( + errLimiterNotFound = errors.New("limiter not found") + errNotLimiter = errors.New("requested component is not a limiter") +) + +// Limitation defines the limit settings for a component. +type Limitation struct { + // LimiterID specifies the name of the extension to use in + // order to limit incoming requests. + LimiterID component.ID `mapstructure:"limiter,omitempty"` +} + +// GetLimiter attempts to select the appropriate extensionauth.Server +// from the list of extensions, based on the requested extension +// name. If an authenticator is not found, an error is returned. +func (l Limitation) GetLimiter(ctx context.Context, extensions map[component.ID]component.Component) (limiter.Limiter, error) { + if ext, found := extensions[l.LimiterID]; found { + if ext, ok := ext.(limiter.Extension); ok && ext != nil { + return ext.GetLimiter(ctx) + } + return nil, errNotLimiter + } + + return nil, fmt.Errorf("failed to resolve limiter %q: %w", l.LimiterID, errLimiterNotFound) +} diff --git a/config/configlimiter/configlimiter_test.go b/config/configlimiter/configlimiter_test.go new file mode 100644 index 00000000000..a4e81d664ae --- /dev/null +++ b/config/configlimiter/configlimiter_test.go @@ -0,0 +1,66 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package configlimiter + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/extension" + "go.opentelemetry.io/collector/extension/xextension/limiter" +) + +var mockID = component.MustNewID("mock") +var otherID = component.MustNewID("other") + +func TestGetLimiter(t *testing.T) { + testCases := []struct { + name string + cfg Limitation + limiter extension.Extension + expected error + }{ + { + name: "obtain limiter", + cfg: Limitation{mockID}, + limiter: limiter.NewNop(), + expected: nil, + }, + { + name: "wrong limiter", + cfg: Limitation{otherID}, + limiter: limiter.NewNop(), + expected: errNotLimiter, + }, + { + name: "missing limiter", + cfg: Limitation{component.MustNewID("missing")}, + limiter: limiter.NewNop(), + expected: errLimiterNotFound, + }, + } + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + ext := map[component.ID]component.Component{ + mockID: tt.limiter, + otherID: nil, + } + + limExt, err := tt.cfg.GetLimiter(context.Background(), ext) + + // verify + if tt.expected != nil { + require.ErrorIs(t, err, tt.expected) + assert.Nil(t, limExt) + } else { + require.NoError(t, err) + assert.NotNil(t, limExt) + } + }) + } +} diff --git a/config/configlimiter/go.mod b/config/configlimiter/go.mod new file mode 100644 index 00000000000..3149d38283d --- /dev/null +++ b/config/configlimiter/go.mod @@ -0,0 +1,39 @@ +module go.opentelemetry.io/collector/config/configlimiter + +go 1.23.0 + +require ( + github.com/stretchr/testify v1.10.0 + go.opentelemetry.io/collector/component v1.27.0 + go.opentelemetry.io/collector/extension v1.27.0 + go.opentelemetry.io/collector/extension/xextension v0.0.0-00010101000000-000000000000 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + go.opentelemetry.io/collector/pdata v1.27.0 // indirect + go.opentelemetry.io/otel v1.34.0 // indirect + go.opentelemetry.io/otel/metric v1.34.0 // indirect + go.opentelemetry.io/otel/trace v1.34.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.27.0 // indirect + golang.org/x/net v0.34.0 // indirect + golang.org/x/sys v0.29.0 // indirect + golang.org/x/text v0.21.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f // indirect + google.golang.org/grpc v1.71.0 // indirect + google.golang.org/protobuf v1.36.5 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) + +replace go.opentelemetry.io/collector/pdata => ../../pdata + +replace go.opentelemetry.io/collector/component => ../../component + +replace go.opentelemetry.io/collector/component/componenttest => ../../component/componenttest + +replace go.opentelemetry.io/collector/extension => ../../extension + +replace go.opentelemetry.io/collector/extension/xextension => ../../extension/xextension diff --git a/config/configlimiter/go.sum b/config/configlimiter/go.sum new file mode 100644 index 00000000000..0485cda44dd --- /dev/null +++ b/config/configlimiter/go.sum @@ -0,0 +1,88 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= +go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= +go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ= +go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE= +go.opentelemetry.io/otel/sdk v1.34.0 h1:95zS4k/2GOy069d321O8jWgYsW3MzVV+KuSPKp7Wr1A= +go.opentelemetry.io/otel/sdk v1.34.0/go.mod h1:0e/pNiaMAqaykJGKbi+tSjWfNNHMTxoC9qANsCzbyxU= +go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce1EK0Gyvahk= +go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w= +go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k= +go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0= +golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= +golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f h1:OxYkA3wjPsZyBylwymxSHa7ViiW1Sml4ToBrncvFehI= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f/go.mod h1:+2Yz8+CLJbIfL9z73EW45avw8Lmge3xVElCP9zEKi50= +google.golang.org/grpc v1.71.0 h1:kF77BGdPTQ4/JZWMlb9VpJ5pa25aqvVqogsxNHHdeBg= +google.golang.org/grpc v1.71.0/go.mod h1:H0GRtasmQOh9LkFoCPDu3ZrwUtD1YGE+b2vYBYd/8Ec= +google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= +google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/extension/xextension/go.mod b/extension/xextension/go.mod index 5e78b0b01c7..264a23dca5d 100644 --- a/extension/xextension/go.mod +++ b/extension/xextension/go.mod @@ -3,12 +3,15 @@ module go.opentelemetry.io/collector/extension/xextension go 1.23.0 require ( + github.com/stretchr/testify v1.10.0 go.opentelemetry.io/collector/component v1.27.0 go.opentelemetry.io/collector/extension v1.27.0 ) require ( + github.com/davecgh/go-spew v1.1.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect go.opentelemetry.io/collector/pdata v1.27.0 // indirect go.opentelemetry.io/otel v1.34.0 // indirect go.opentelemetry.io/otel/metric v1.34.0 // indirect @@ -21,6 +24,7 @@ require ( google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f // indirect google.golang.org/grpc v1.71.0 // indirect google.golang.org/protobuf v1.36.5 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) replace go.opentelemetry.io/collector/extension => ../ diff --git a/extension/xextension/go.sum b/extension/xextension/go.sum index ea249180e5a..0485cda44dd 100644 --- a/extension/xextension/go.sum +++ b/extension/xextension/go.sum @@ -14,8 +14,14 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -75,5 +81,8 @@ google.golang.org/grpc v1.71.0 h1:kF77BGdPTQ4/JZWMlb9VpJ5pa25aqvVqogsxNHHdeBg= google.golang.org/grpc v1.71.0/go.mod h1:H0GRtasmQOh9LkFoCPDu3ZrwUtD1YGE+b2vYBYd/8Ec= google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/extension/xextension/limiter/README.md b/extension/xextension/limiter/README.md new file mode 100644 index 00000000000..5dbd353519f --- /dev/null +++ b/extension/xextension/limiter/README.md @@ -0,0 +1,47 @@ +# Storage + +**Status: under development** + +A limiter extension supports flexible methods for admission and rate +control. Other components, typically receivers, can request a limiter +from the extension and use it to admit requests. + +This interface is byte-weight oriented, enabling limiting for +individual payloads within a stream-oriented RPC. For request-level +limiting, consider using the Auth extension instead. + +The `limiter.Extension` interface extends `component.Extension` by +adding the following method: + +``` +GetLimiter(context.Context, component.Kind, component.ID) (Client, error) +``` + +After the context argument are two component-level identifiers which +identify the component that will request to be limited, which is the +name of the extension. Typically, components will support a list of +named limiters to apply in sequence, e.g., + +``` +receivers: + someprotocol: + limiters: + - rate + - memory +``` + +The `limiter.Limiter` interface contains the following method: + +``` +Acquire(ctx context.Context, weight uint64) (ReleaseFunc, error) +``` + +The weight parameter specifies how large of an request to consider in +the limiter, which should estimate the amount of real memory used by +to represent the data after it is uncompressed. + +The result is either a non-nil release function and nil error, or a +nil release function and non-nil error. The limiter may block or fail +fast, at its discretion. When a non-nil release function is returned, +the component is responsible for calling the release function after +the memory is no longer used. diff --git a/extension/xextension/limiter/doc.go b/extension/xextension/limiter/doc.go new file mode 100644 index 00000000000..87b1305bb37 --- /dev/null +++ b/extension/xextension/limiter/doc.go @@ -0,0 +1,6 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package limiter implements an extension that can limit requests by +// blocking or failing them through calls to acquire and release. +package limiter // import "go.opentelemetry.io/collector/extension/xextension/limiter" diff --git a/extension/xextension/limiter/limit.go b/extension/xextension/limiter/limit.go new file mode 100644 index 00000000000..c820c9ebdb8 --- /dev/null +++ b/extension/xextension/limiter/limit.go @@ -0,0 +1,69 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package limiter // import "go.opentelemetry.io/collector/extension/xextension/limiter" + +import ( + "context" + + "go.opentelemetry.io/collector/extension" +) + +// Extension is the interface that storage extensions must implement +type Extension interface { + extension.Extension + + // GetLimiter will create a limiter for use by a component. + // The component can use the limiter to limit admission to the + // pipeline. + GetLimiter(ctx context.Context) (Limiter, error) +} + +// Limiter implements a limiter for byte-weighted request admission. +type Limiter interface { + // Acquire asks the controller to admit the caller. + // + // The weight parameter specifies how large of an request to + // consider in the limiter. This should be a measure of the + // size of the request after it is uncompressed. One way to + // compute the weight of a request is byte its OTLP encoding + // size using the appropriate pdata `Sizer` type. However, + // components are encouraged to use an approximation, not + // necessarily based on the OTLP encoding size, if there is an + // efficient substitute. + // + // The goal is for the estimated weight to approximate the + // amount of real memory that will be occupied while the + // request is in flight. If the component uses a protocol + // with less repetition than OTLP, it is possible for the + // Sizer to overestimate the amount of real memory used, + // because Golang's immutable string value allows + // it. Therefore, components should prefer an inexpensive, + // approximate method to determine weight in bytes. + // + // The limiter is permitted to block the request. After making + // its decision, the return value is exclusively a function + // value or an error. Admit will return when one of the + // following events occurs: + // + // (1) admission is allowed, or + // (2) the provided ctx becomes canceled, or + // (3) the limiter determines that the request should fail. + // + // In case (1), the return value will be a non-nil + // ReleaseFunc. The caller must invoke it after it is finished + // with the resource being guarded by the admission + // controller. + // + // In case (2), the return value should be similar to: + // - gRPC: Cancelled or DeadlineExceeded error + // - HTTP: Status 408. + // + // In case (3), the return value should be similar to: + // - gRPC: ResourceExhausted error + // - HTTP: Status 429. + Acquire(ctx context.Context, weight uint64) (ReleaseFunc, error) +} + +// ReleaseFunc is returned by Acquire when the Acquire() was admitted. +type ReleaseFunc func() diff --git a/extension/xextension/limiter/limit_test.go b/extension/xextension/limiter/limit_test.go new file mode 100644 index 00000000000..2d0c760d92e --- /dev/null +++ b/extension/xextension/limiter/limit_test.go @@ -0,0 +1,23 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package limiter + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestNopLimiter(t *testing.T) { + ctx := context.Background() + nop := NewNop() + + lim, err := nop.GetLimiter(ctx) + require.NoError(t, err) + + rel, err := lim.Acquire(ctx, 1000) + require.NoError(t, err) + rel() +} diff --git a/extension/xextension/limiter/metadata.yaml b/extension/xextension/limiter/metadata.yaml new file mode 100644 index 00000000000..b5c23968ada --- /dev/null +++ b/extension/xextension/limiter/metadata.yaml @@ -0,0 +1,10 @@ +type: xextension +github_project: open-telemetry/opentelemetry-collector + +status: + class: extension + codeowners: + active: + - jmacd + stability: + development: [traces, metrcs, logs, profiles] diff --git a/extension/xextension/limiter/nop_client.go b/extension/xextension/limiter/nop_client.go new file mode 100644 index 00000000000..cc0c59bda7e --- /dev/null +++ b/extension/xextension/limiter/nop_client.go @@ -0,0 +1,34 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package limiter // import "go.opentelemetry.io/collector/extension/xextension/limiter" + +import ( + "context" + + "go.opentelemetry.io/collector/extension" +) + +type nopExtension struct { + extension.Extension +} + +var nopExtensionInstance Extension = &nopExtension{} + +type nopLimiter struct{} + +var nopLimiterInstance Limiter = &nopLimiter{} + +// NewNop returns a limiter extension that does nothing. +func NewNop() Extension { + return nopExtensionInstance +} + +// Acquire implements Limiter. +func (nopExtension) GetLimiter(_ context.Context) (Limiter, error) { + return nopLimiterInstance, nil +} + +func (nopLimiter) Acquire(_ context.Context, _ uint64) (ReleaseFunc, error) { + return func() {}, nil +}