diff --git a/cmd/otelcorecol/builder-config.yaml b/cmd/otelcorecol/builder-config.yaml index 604b0cd4c67..e111d679c1e 100644 --- a/cmd/otelcorecol/builder-config.yaml +++ b/cmd/otelcorecol/builder-config.yaml @@ -82,6 +82,7 @@ replaces: - go.opentelemetry.io/collector/extension/extensioncapabilities => ../../extension/extensioncapabilities - go.opentelemetry.io/collector/extension/extensionmiddleware => ../../extension/extensionmiddleware - go.opentelemetry.io/collector/extension/extensionmiddleware/extensionmiddlewaretest => ../../extension/extensionmiddleware/extensionmiddlewaretest + - go.opentelemetry.io/collector/extension/extensionlimiter => ../../extension/extensionlimiter - go.opentelemetry.io/collector/extension/extensiontest => ../../extension/extensiontest - go.opentelemetry.io/collector/extension/memorylimiterextension => ../../extension/memorylimiterextension - go.opentelemetry.io/collector/extension/xextension => ../../extension/xextension diff --git a/cmd/otelcorecol/go.mod b/cmd/otelcorecol/go.mod index 549eef528d6..aa1d73cd713 100644 --- a/cmd/otelcorecol/go.mod +++ b/cmd/otelcorecol/go.mod @@ -4,7 +4,7 @@ module go.opentelemetry.io/collector/cmd/otelcorecol go 1.23.0 -toolchain go1.23.9 +toolchain go1.24.1 require ( go.opentelemetry.io/collector/component v1.32.0 @@ -111,7 +111,8 @@ require ( go.opentelemetry.io/collector/exporter/xexporter v0.126.0 // indirect go.opentelemetry.io/collector/extension/extensionauth v1.32.0 // indirect go.opentelemetry.io/collector/extension/extensioncapabilities v0.126.0 // indirect - go.opentelemetry.io/collector/extension/extensionmiddleware v0.126.0 // indirect + go.opentelemetry.io/collector/extension/extensionlimiter v0.0.0-00010101000000-000000000000 // indirect + go.opentelemetry.io/collector/extension/extensionmiddleware v1.30.0 // indirect go.opentelemetry.io/collector/extension/extensiontest v0.126.0 // indirect go.opentelemetry.io/collector/extension/xextension v0.126.0 // indirect go.opentelemetry.io/collector/featuregate v1.32.0 // indirect @@ -263,6 +264,8 @@ replace go.opentelemetry.io/collector/extension/extensionmiddleware => ../../ext replace go.opentelemetry.io/collector/extension/extensionmiddleware/extensionmiddlewaretest => ../../extension/extensionmiddleware/extensionmiddlewaretest +replace go.opentelemetry.io/collector/extension/extensionlimiter => ../../extension/extensionlimiter + replace go.opentelemetry.io/collector/extension/extensiontest => ../../extension/extensiontest replace go.opentelemetry.io/collector/extension/memorylimiterextension => ../../extension/memorylimiterextension diff --git a/extension/extensionlimiter/Makefile b/extension/extensionlimiter/Makefile new file mode 100644 index 00000000000..ded7a36092d --- /dev/null +++ b/extension/extensionlimiter/Makefile @@ -0,0 +1 @@ +include ../../Makefile.Common diff --git a/extension/extensionlimiter/README.md b/extension/extensionlimiter/README.md new file mode 100644 index 00000000000..4656ebece4f --- /dev/null +++ b/extension/extensionlimiter/README.md @@ -0,0 +1,352 @@ +# OpenTelemetry Collector Extension Limiter Package + +**Document status: development** + +The `extensionlimiter` package provides interfaces for rate limiting +and resource limiting in the OpenTelemetry Collector, enabling control +over data flow and resource usage through extensions that can be +configured through middleware and/or directly by pipeline components. + +## Overview + +This package defines two primary limiter **kinds**, which have +different interfaces: + +- **Rate Limiters**: Control time-based limits on quantities such as + bytes or items per second. +- **Resource Limiters**: Manage physical limits on quantities such as + concurrent requests or memory usage. + +Both limiter kinds are unified through the `LimiterWrapper` interface, +which simplifies consumers in most cases by providing a consistent +`LimitCall` interface. + +A limiter is **saturated** by definition when a limit is completely +overloaded in at least one weight, generally it means callers should +immediately deny work to continue on the request. + +Each kind of limiter as well as the wrapper type have corresponding +**provider** interfaces that return a limiter instance based on a +weight keys. + +Weight keys describe the standard limiting dimensions. There are +currently four standard weight keys: network bytes, request count, +request items, and memory size. Callers use the `Checker` interface +to check whether any weight keys (from a set) are saturated. + +## Key Interfaces + +- `LimiterWrapper`: Provides a callback-based limiting interface that + works with both rate and resource limiters, has a `LimitCall` method, + plus a provider type. +- `RateLimiter`: Applies time-based limits, has a `Limit` method, + plus provider type. +- `ResourceLimiter`: Manages physical resource limits, has + an `Acquire` method and a corresponding `ReleaseFunc`, + plus a provider type. +- `Checker`: Has a `MustDeny` method. + +### Limiter helpers + +The `limiterhelper` subpackage provides: + +- Consumer wrappers apply limits to a collector pipeline (e.g., + `NewLimitedLogs` for a limiter combined with `consumer.NewLogs`) +- Multi-limiter combinators: for simple combined limiter functionality +- Middleware conversion utilities: convert middleware configurations + to limiter providers. + +## Recommendations + +For general use cases, prefer the `LimiterWrapper` interface with its +callback-based approach because it is agnostic to the difference between +rate and resource limiters. + +Use the direct `RateLimiter` or `ResourceLimiter` interfaces only in +special cases where control flow can't be easily scoped. + +Middleware configuration typically automates the configuration of +network bytes and request count weight keys relatively early in a +pipeline. Receivers are responsible for limiting request items and +memory size through one of the available helpers. + +Processors can apply limiters for specific reasons, for example to +apply limits in data-dependent ways. Exporters can apply limiters for +the same reasons, for example to apply limits in destination-dependent +ways. + +### Limiter blocking and failing + +Limiters implementations MAY block the request or fail immediately, +subject to internal logic. A limiter aims to avoid waste, which +requires balancing several factors. To fail a request that has already +been transmitted, received and parsed is sometimes more wasteful than +waiting for a little while; on the other hand waiting for a long time +risks wasting memory. In general, an overloaded limiter that is +saturated SHOULD fail requests immediately. + +Limiter implementations SHOULD consider the context deadline when +they block. If the deadline is likely to expire before the limit +becomes available, they should return a standard overload signal. + +### Limiter saturation + +Rate and resource limiter providers have a `GetChecker` method to +provide a `Checker`, featuring a `MustDeny` method which is made +available for applications to test when any limit is fully +saturated that would eventually deny the request. + +The `Checker` is consulted at least once and applies to all weight +keys. Because a `Checker` can be consulted more than once by a +receiver and/or middleware, it is possible for requests to be denied +over the saturation of limits they were already granted. Users should +configure external load balancers and/or horizontal scaling policies +to avoid cases of limiter saturation. + +### Limit before or after use + +It is sometimes possible to request a limit before it is actually +used. As an example, consider a protocol using a compressed payload, +such that the receiver knows how much memory will be allocated before +the fact. In this case the receiver can request the limit before using +it, but this will not always be the case. Generally, prefer to limit +before use, but either way be consistent. + +When using the low-level interfaces directly, limits SHOULD be applied +before creating new concurrent work. + +### Examples + +#### OTLP receiver + +Limiters applied through middleware are an implementation detail, +simply configure them using `configgrpc` or `confighttp`. For the +OTLP receiver (e.g., with two `ratelimiter` extensions): + +```yaml +extensions: + ratelimiter/limit_for_grpc: + # rate limiter settings for gRPC + ratelimiter/limit_for_grpc: + # rate limiter settings for HTTP + +receivers: + otlp: + protocols: + grpc: + middlewares: + - ratelimiter/limit_for_grpc + http: + middlewares: + - ratelimiter/limit_for_http +``` + +Note that the OTLP receiver specifically supports multiple protocols +with separate middleware configurations, thus it configures limiters +for request items and memory size on a protocol-by-protocol basis. + +#### HTTP metrics scraper + +A HTTP pull-based receiver can implement a basic limited scraper loop +as follows. The HTTP client config object's `middlewares` field +automatically configures network bytes and request count limits: + +```yaml +receivers: + scraper: + http: + middlewares: + - ratelimiter/scraper +``` + +Limiter extensions are derived from a host, a middlewares list, and a +list of weight keys. When middleware is configurable at the factory +level, it may be added via `receiver.NewFactory` using +`receiver.WithLimiters(getLimiters)`: + +```golang +func NewFactory() receiver.Factory { + return xreceiver.NewFactory( + metadata.Type, + createDefaultConfig, + xreceiver.WithMetrics(createMetrics, metadata.MetricsStability), + xreceiver.WithLimiters(getLimiters), + ) +} +``` + +Here, `getLimiters` is a function to get the effective +`[]configmiddleware.Config` and derive pipeline consumers using +`limiterhelper` adapters. + +To acquire a limiter, use `MiddlewaresToLimiterWrapperProvider` to +obtain a combined limiter wrapper around the input `nextMetrics` +consumer. It will pass `StandardNotMiddlewareKeys()` indicating to +apply request items and memory size: + +```golang + // Extract limiter provider from middlewares. + s.limiterProvider, err = limiterhelper.MiddlewaresToLimiterWrapperProvider( + cfg.Middlewares) + if err != nil { ... } + + // Extract a checker from the provider + s.checker, err = s.limiterProvider.GetChecker() + if err != nil { ... } + + // Here get a limiter-wrapped pipeline and a combination of weight-specific + // limiters for MustDeny() functionality. + s.nextMetrics, err = limiterhelper.NewLimitedMetrics( + s.nextMetrics, limiterhelper.StandardNotMiddlewareKeys(), s.limiterProvider) + if err != nil { ... } +``` + +In the scraper loop, use `MustDeny` before starting a scrape: + +```golang +func (s *scraper) scrapeOnce(ctx context.Context) error { + if err := s.checker.MustDeny(ctx); err != nil { + return err + } + + // Network bytes and request count limits are applied in middleware. + // before this returns: + data, err := s.getData(ctx) + if err != nil { + return err + } + + // Request items and memory size are applied in the pipeline. + return s.nextMetrics.ConsumeMetrics(ctx, data) +} +``` + +#### gRPC stream receiver + +A gRPC streaming receiver that holds memory across its allocated in +`Send()` and does not release it until after a corresponding `Recv()` +requires use of the lower-level `ResourceLimiter` interface. +The gRPC config object's `middlewares` field +automatically configures network bytes and request count limits: + +```yaml +receivers: + streamer: + grpc: + middlewares: + - ratelimiter/streamer +``` + +The receiver will check `s.checker.MustDeny()` as above. In a stream, +limiters are expected to block the stream until limit requests +succeed, however after the limit requests succeed, the receiver may +wish to return from `Send()` to continue accepting new requests while +the consumer works in a separate goroutine. The limit will be released +after the consumer returns. + +```golang +func (s *scraper) LogsStream(ctx context.Context, stream *Stream) error { + for { + // Check saturation for all limiters, all keys. + err := s.checker.MustDeny(ctx) + if err != nil { ... } + + // The network bytes and request count limits are applied in middleware. + req, err := stream.Recv() + if err != nil { ... } + + // Allocate memory objects. + data, err := s.getLogs(ctx, req) + if err != nil { ... } + + release, err := s.memorySizeLimiter.Acquire(ctx, pdataSize(data)) + if err != nil { ... } + + go func() { + // Request items limit is applied in the pipeline consumer + err := s.nextMetrics.ConsumeMetrics(ctx, data) + + // Release the memory. + release() + + // Reply to the caller. + stream.Send(streamResponseFromConsumerError(err)) + } + } +} +``` + +#### Open questions + +##### Middleware implementation details + +Details are +important. [#12700](https://github.com/open-telemetry/opentelemetry-collector/pull/12700) +contained a `limitermiddleware` implementation which was a middleware +that called a limiter for HTTP and gRPC. Roughly the same code will be +used, and the details will come out. + +##### Provider options + +An `Option` type has been added as a placeholder in the provider +interfaces. **NOTE: No options are implemented.** Potential options: + +- The protocol name +- The signal kind +- The caller's component ID + +Because the set of each of these is small, it is possible to +pre-compute limiter instances for the cross product of configurations. + +##### Context-dependent limits + +Client metadata (i.e., headers) may be used in the context to make +limiter decisions. These details are automatically extracted from the +Context passed to `MustDeny`, `Limit`, `Acquire`, and `LimitCall` +functions. No examples are provided. How will limiters configure, for +example, tenant-specific limits? + +##### Data-dependent limits + +When a single unit of data contains limits that are assignable to +multiple distinct limiters, one option available to users is to split +requests and add to their context and run them concurrently through +context-dependent limiters. See +[#39199](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/39199). + +Another option is to add support for non-blocking limit requests. For +example, to apply limits using information derived from the +OpenTelemetry resource, we might do something like this pseudo-code: + +``` +func (p *processor) limitLogs(ctx context.Context, logsData plog.Logs) (plog.Logs, extensionlimiter.ReleaseFunc, error) { + var rels extensionlimiter.ReleaseFuncs + logsData.ResourceLogs().RemoveIf(func(rl plog.ResourceLogs) bool { + md := resourceToMetadata(rl.Resource()) + rel, err := p.nonBlockingLimiter.TryLimitOrAcquire(withMetadata(ctx, md)) + if err != nil { + return false + } + rels = append(rels, rel) + return true + }) + if logsData.ResourceLogs().Len() == 0 { + return logsData, func() {}, processorhelper.ErrSkipProcessingData + } + return logsData, rels.Release, nil +} + +func (p *processor) ConsumeLogs(ctx context.Context, logsData plog.Logs) error { + logsData, release, err = limitLogs(ctx, logsData) + if err != nil { + return err + } + defer release() + return p.nextLogs.ConsumeLogs(ctx, logsData) +} +``` + +Here, the release a new `TryLimitOrAcquire` function abstracts the +form of a non-blocking call to either form of limiter. If the +underyling limiter is a rate limiter, the release function will be a +no-op. diff --git a/extension/extensionlimiter/extensionlimiter.go b/extension/extensionlimiter/extensionlimiter.go new file mode 100644 index 00000000000..04789e79dab --- /dev/null +++ b/extension/extensionlimiter/extensionlimiter.go @@ -0,0 +1,60 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package extensionlimiter // import "go.opentelemetry.io/collector/extension/extensionlimiter" + +import ( + "context" +) + +// Option is passed to limiter providers. +// +// NOTE: For data-specific or tenant-specific limits we will extend +// providers with Options and add a Config type, but none are +// supported yet and this PR contains only interfaces, not need for +// options in core repository components. +type Option interface { + apply() +} + +// Checker is for checking when a limit is saturated. This can be +// called prior to the start of work to check for limiter saturation. +type Checker interface { + // MustDeny is a request to apply a hard limit. If this + // returns non-nil, the caller must not begin new work in this + // context. + MustDeny(context.Context) error +} + +// MustDenyFunc is a functional way to build MustDeny functions. +type MustDenyFunc func(context.Context) error + +// A MustDeny function is a complete Checker. +var _ Checker = MustDenyFunc(nil) + +// MustDeny implements Checker. +func (f MustDenyFunc) MustDeny(ctx context.Context) error { + if f == nil { + return nil + } + return f(ctx) +} + +// CheckerProvider is an interface to obtain checkers for a group of +// weight keys. +type CheckerProvider interface { + // GetChecker returns a checker for a group of weight keys. + GetChecker(...Option) (Checker, error) +} + +// GetCheckerFunc is a functional way to construct GetChecker +// functions, used in limiter providers. +type GetCheckerFunc func(...Option) (Checker, error) + +// Checker implements CheckerProvider. +func (f GetCheckerFunc) GetChecker(opts ...Option) (Checker, error) { + if f == nil { + return nil, nil + } + return f(opts...) +} diff --git a/extension/extensionlimiter/go.mod b/extension/extensionlimiter/go.mod new file mode 100644 index 00000000000..76fc85c8217 --- /dev/null +++ b/extension/extensionlimiter/go.mod @@ -0,0 +1,66 @@ +module go.opentelemetry.io/collector/extension/extensionlimiter + +go 1.23.0 + +require ( + go.opentelemetry.io/collector/component v1.32.0 + go.opentelemetry.io/collector/config/configmiddleware v0.0.0-00010101000000-000000000000 + go.opentelemetry.io/collector/consumer v1.32.0 + go.opentelemetry.io/collector/consumer/xconsumer v0.0.0-00010101000000-000000000000 + go.opentelemetry.io/collector/extension v1.32.0 + go.opentelemetry.io/collector/pdata v1.32.0 + go.opentelemetry.io/collector/pdata/pprofile v0.126.0 +) + +require ( + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/hashicorp/go-version v1.7.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/collector/extension/extensionmiddleware v1.30.0 // indirect + go.opentelemetry.io/collector/featuregate v1.32.0 // indirect + go.opentelemetry.io/collector/internal/telemetry v0.126.0 // indirect + go.opentelemetry.io/contrib/bridges/otelzap v0.10.0 // indirect + go.opentelemetry.io/otel v1.35.0 // indirect + go.opentelemetry.io/otel/log v0.11.0 // indirect + go.opentelemetry.io/otel/metric v1.35.0 // indirect + go.opentelemetry.io/otel/sdk v1.35.0 // indirect + go.opentelemetry.io/otel/trace v1.35.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.27.0 // indirect + golang.org/x/net v0.39.0 // indirect + golang.org/x/sys v0.32.0 // indirect + golang.org/x/text v0.24.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a // indirect + google.golang.org/grpc v1.72.0 // indirect + google.golang.org/protobuf v1.36.6 // indirect +) + +replace go.opentelemetry.io/collector/consumer => ../../consumer + +replace go.opentelemetry.io/collector/featuregate => ../../featuregate + +replace go.opentelemetry.io/collector/component => ../../component + +replace go.opentelemetry.io/collector/consumer/xconsumer => ../../consumer/xconsumer + +replace go.opentelemetry.io/collector/pdata => ../../pdata + +replace go.opentelemetry.io/collector/pdata/pprofile => ../../pdata/pprofile + +replace go.opentelemetry.io/collector/config/configmiddleware => ../../config/configmiddleware + +replace go.opentelemetry.io/collector/extension/extensionmiddleware => ../../extension/extensionmiddleware + +replace go.opentelemetry.io/collector/pipeline => ../../pipeline + +replace go.opentelemetry.io/collector/extension/extensionmiddleware/extensionmiddlewaretest => ../extensionmiddleware/extensionmiddlewaretest + +replace go.opentelemetry.io/collector/extension => ../ + +replace go.opentelemetry.io/collector/internal/telemetry => ../../internal/telemetry diff --git a/extension/extensionlimiter/go.sum b/extension/extensionlimiter/go.sum new file mode 100644 index 00000000000..26de4444ce4 --- /dev/null +++ b/extension/extensionlimiter/go.sum @@ -0,0 +1,97 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY= +github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/contrib/bridges/otelzap v0.10.0 h1:ojdSRDvjrnm30beHOmwsSvLpoRF40MlwNCA+Oo93kXU= +go.opentelemetry.io/contrib/bridges/otelzap v0.10.0/go.mod h1:oTTm4g7NEtHSV2i/0FeVdPaPgUIZPfQkFbq0vbzqnv0= +go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ= +go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y= +go.opentelemetry.io/otel/log v0.11.0 h1:c24Hrlk5WJ8JWcwbQxdBqxZdOK7PcP/LFtOtwpDTe3Y= +go.opentelemetry.io/otel/log v0.11.0/go.mod h1:U/sxQ83FPmT29trrifhQg+Zj2lo1/IPN1PF6RTFqdwc= +go.opentelemetry.io/otel/metric v1.35.0 h1:0znxYu2SNyuMSQT4Y9WDWej0VpcsxkuklLa4/siN90M= +go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE= +go.opentelemetry.io/otel/sdk v1.35.0 h1:iPctf8iprVySXSKJffSS79eOjl9pvxV9ZqOWT0QejKY= +go.opentelemetry.io/otel/sdk v1.35.0/go.mod h1:+ga1bZliga3DxJ3CQGg3updiaAJoNECOgJREo9KHGQg= +go.opentelemetry.io/otel/sdk/metric v1.35.0 h1:1RriWBmCKgkeHEhM7a2uMjMUfP7MsOF5JpUCaEqEI9o= +go.opentelemetry.io/otel/sdk/metric v1.35.0/go.mod h1:is6XYCUMpcKi+ZsOvfluY5YstFnhW0BidkR+gL+qN+w= +go.opentelemetry.io/otel/trace v1.35.0 h1:dPpEfJu1sDIqruz7BHFG3c7528f6ddfSWfFDVt/xgMs= +go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.39.0 h1:ZCu7HMWDxpXpaiKdhzIfaltL9Lp31x/3fCP11bc6/fY= +golang.org/x/net v0.39.0/go.mod h1:X7NRbYVEA+ewNkCNyJ513WmMdQ3BineSwVtN2zD/d+E= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20= +golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0= +golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a h1:51aaUVRocpvUOSQKM6Q7VuoaktNIaMCLuhZB6DKksq4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a/go.mod h1:uRxBH1mhmO8PGhU89cMcHaXKZqO+OfakD8QQO0oYwlQ= +google.golang.org/grpc v1.72.0 h1:S7UkcVa60b5AAQTaO6ZKamFp1zMZSU0fGDK2WZLbBnM= +google.golang.org/grpc v1.72.0/go.mod h1:wH5Aktxcg25y1I3w7H69nHfXdOG3UiadoBtjh3izSDM= +google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= +google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/extension/extensionlimiter/limiterhelper/checker.go b/extension/extensionlimiter/limiterhelper/checker.go new file mode 100644 index 00000000000..fcad04b4c18 --- /dev/null +++ b/extension/extensionlimiter/limiterhelper/checker.go @@ -0,0 +1,33 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package limiterhelper // import "go.opentelemetry.io/collector/extension/extensionlimiter/limiterhelper" + +import ( + "context" + "errors" + + "go.opentelemetry.io/collector/extension/extensionlimiter" +) + +// MultiChecker returns MustDeny when any element returns MustDeny. +type MultiChecker []extensionlimiter.Checker + +var _ extensionlimiter.Checker = MultiChecker{} + +// MustDeny implements Checker. +func (ls MultiChecker) MustDeny(ctx context.Context) error { + var err error + for _, lim := range ls { + if lim == nil { + continue + } + err = errors.Join(err, lim.MustDeny(ctx)) + } + return err +} + +// NeverDeny returns a Checker that never denies. +func NeverDeny() extensionlimiter.Checker { + return extensionlimiter.MustDenyFunc(nil) +} diff --git a/extension/extensionlimiter/limiterhelper/consumer.go b/extension/extensionlimiter/limiterhelper/consumer.go new file mode 100644 index 00000000000..c4eb3aa7d56 --- /dev/null +++ b/extension/extensionlimiter/limiterhelper/consumer.go @@ -0,0 +1,217 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package limiterhelper // import "go.opentelemetry.io/collector/extension/extensionlimiter/limiterhelper" + +import ( + "context" + "errors" + + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/xconsumer" + "go.opentelemetry.io/collector/extension/extensionlimiter" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pprofile" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +// Traits object interface is generalized by P the pipeline data type +// (e.g., ptrace.Traces) and C the consumer type (e.g., +// consumer.Traces) +type traits[P, C any] interface { + // itemCount is SpanCount(), DataPointCount(), or LogRecordCount(). + itemCount(P) uint64 + // memorySize uses the appropriate protobuf Sizer as a proxy + // for memory used. + memorySize(data P) uint64 + // consume calls the appropriate consumer method (e.g., ConsumeTraces) + consume(ctx context.Context, data P, next C) error + // create is a functional constructor the consumer type (e.g., consumer.NewTraces) + create(func(ctx context.Context, data P) error, ...consumer.Option) (C, error) +} + +// Traces traits + +type traceTraits struct{} + +func (traceTraits) itemCount(data ptrace.Traces) uint64 { + return uint64(data.SpanCount()) +} + +func (traceTraits) memorySize(data ptrace.Traces) uint64 { + var sizer ptrace.MarshalSizer + return uint64(sizer.TracesSize(data)) +} + +func (traceTraits) create(next func(ctx context.Context, data ptrace.Traces) error, opts ...consumer.Option) (consumer.Traces, error) { + return consumer.NewTraces(next, opts...) +} + +func (traceTraits) consume(ctx context.Context, data ptrace.Traces, next consumer.Traces) error { + return next.ConsumeTraces(ctx, data) +} + +// Metrics traits + +type metricTraits struct{} + +func (metricTraits) itemCount(data pmetric.Metrics) uint64 { + return uint64(data.DataPointCount()) +} + +func (metricTraits) memorySize(data pmetric.Metrics) uint64 { + var sizer pmetric.MarshalSizer + return uint64(sizer.MetricsSize(data)) +} + +func (metricTraits) create(next func(ctx context.Context, data pmetric.Metrics) error, opts ...consumer.Option) (consumer.Metrics, error) { + return consumer.NewMetrics(next, opts...) +} + +func (metricTraits) consume(ctx context.Context, data pmetric.Metrics, next consumer.Metrics) error { + return next.ConsumeMetrics(ctx, data) +} + +// Logs traits + +type logTraits struct{} + +func (logTraits) itemCount(data plog.Logs) uint64 { + return uint64(data.LogRecordCount()) +} + +func (logTraits) memorySize(data plog.Logs) uint64 { + var sizer plog.MarshalSizer + return uint64(sizer.LogsSize(data)) +} + +func (logTraits) create(next func(ctx context.Context, data plog.Logs) error, opts ...consumer.Option) (consumer.Logs, error) { + return consumer.NewLogs(next, opts...) +} + +func (logTraits) consume(ctx context.Context, data plog.Logs, next consumer.Logs) error { + return next.ConsumeLogs(ctx, data) +} + +// Profiles traits + +type profileTraits struct{} + +func (profileTraits) itemCount(data pprofile.Profiles) uint64 { + return uint64(data.SampleCount()) +} + +func (profileTraits) memorySize(data pprofile.Profiles) uint64 { + var sizer pprofile.MarshalSizer + return uint64(sizer.ProfilesSize(data)) +} + +func (profileTraits) create(next func(ctx context.Context, data pprofile.Profiles) error, opts ...consumer.Option) (xconsumer.Profiles, error) { + return xconsumer.NewProfiles(next, opts...) +} + +func (profileTraits) consume(ctx context.Context, data pprofile.Profiles, next xconsumer.Profiles) error { + return next.ConsumeProfiles(ctx, data) +} + +// limitOne obtains a LimiterWrapper and applies a single weight limit. +func limitOne[P any, C any]( + next C, + keys extensionlimiter.WeightSet, + provider LimiterWrapperProvider, + m traits[P, C], + key extensionlimiter.WeightKey, + opts []consumer.Option, + quantify func(P) uint64, +) (C, error) { + if !keys.Contains(key) { + return next, nil + } + lim, err := provider.GetLimiterWrapper(key) + if err != nil { + return next, err + } + if lim == nil { + return next, nil + } + return m.create(func(ctx context.Context, data P) error { + return lim.LimitCall(ctx, quantify(data), func(ctx context.Context) error { + return m.consume(ctx, data, next) + }) + }, opts...) +} + +// applyChecker gets a Checker and wraps the pipeline in a MustDeny +// check. +func applyChecker[P any, C any]( + next C, + keys extensionlimiter.WeightSet, + provider LimiterWrapperProvider, + m traits[P, C], + opts []consumer.Option, +) (C, error) { + ck, err := provider.GetChecker() + if err != nil { + return next, err + } + return m.create(func(ctx context.Context, data P) error { + if err := ck.MustDeny(ctx); err != nil { + return err + } + return m.consume(ctx, data, next) + }, opts...) +} + +// newLimited is signal-generic limiting logic. +func newLimited[P any, C any]( + next C, + keys extensionlimiter.WeightSet, + provider LimiterWrapperProvider, + m traits[P, C], + opts ...consumer.Option, +) (C, error) { + if provider == nil { + return next, nil + } + var err1, err2, err3, err4 error + // Note: reverse order of evaluation cost => least-cost applied first. + next, err1 = limitOne(next, keys, provider, m, extensionlimiter.WeightKeyMemorySize, opts, + func(data P) uint64 { + return m.memorySize(data) + }) + next, err2 = limitOne(next, keys, provider, m, extensionlimiter.WeightKeyRequestItems, opts, + func(data P) uint64 { + return m.itemCount(data) + }) + next, err3 = limitOne(next, keys, provider, m, extensionlimiter.WeightKeyRequestCount, opts, + func(_ P) uint64 { + return 1 + }) + next, err4 = applyChecker(next, keys, provider, m, opts) + return next, errors.Join(err1, err2, err3, err4) +} + +// NewLimitedTraces applies a limiter using the provider over keys before calling next. +func NewLimitedTraces(next consumer.Traces, keys extensionlimiter.WeightSet, provider LimiterWrapperProvider) (consumer.Traces, error) { + return newLimited(next, keys, provider, traceTraits{}, + consumer.WithCapabilities(next.Capabilities())) +} + +// NewLimitedLogs applies a limiter using the provider over keys before calling next. +func NewLimitedLogs(next consumer.Logs, keys extensionlimiter.WeightSet, provider LimiterWrapperProvider) (consumer.Logs, error) { + return newLimited(next, keys, provider, logTraits{}, + consumer.WithCapabilities(next.Capabilities())) +} + +// NewLimitedMetrics applies a limiter using the provider over keys before calling next. +func NewLimitedMetrics(next consumer.Metrics, keys extensionlimiter.WeightSet, provider LimiterWrapperProvider) (consumer.Metrics, error) { + return newLimited(next, keys, provider, metricTraits{}, + consumer.WithCapabilities(next.Capabilities())) +} + +// NewLimitedProfiles applies a limiter using the provider over keys before calling next. +func NewLimitedProfiles(next xconsumer.Profiles, keys extensionlimiter.WeightSet, provider LimiterWrapperProvider) (xconsumer.Profiles, error) { + return newLimited(next, keys, provider, profileTraits{}, + consumer.WithCapabilities(next.Capabilities())) +} diff --git a/extension/extensionlimiter/limiterhelper/middleware.go b/extension/extensionlimiter/limiterhelper/middleware.go new file mode 100644 index 00000000000..aabcc175aa3 --- /dev/null +++ b/extension/extensionlimiter/limiterhelper/middleware.go @@ -0,0 +1,166 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package limiterhelper // import "go.opentelemetry.io/collector/extension/extensionlimiter/limiterhelper" + +import ( + "context" + "errors" + "fmt" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configmiddleware" + "go.opentelemetry.io/collector/extension" + "go.opentelemetry.io/collector/extension/extensionlimiter" +) + +var ( + ErrNotALimiter = errors.New("middleware is not a limiter") + ErrLimiterConflict = errors.New("limiter implements both rate and resource-limiters") + ErrUnresolvedLimiter = errors.New("could not resolve middleware limiter") +) + +// MiddlewareIsLimiter returns true if a middleware configuration +// represents a valid limiter, returns false for not found or invalid +// cases. If the named extension is found but is not a limiter, +// returns (false, nil). +func MiddlewareIsLimiter(host component.Host, middleware configmiddleware.Config) (bool, error) { + _, ok, err := middlewareIsLimiter(host, middleware) + return ok, err +} + +// MiddlewaresToLimiterWrapperProvider constructs a combined limiter +// from an ordered list of middlewares. This constructor ignores +// middleware configs that are not limiters. +// +// When no limiters are found (with no errors), the returned provider +// is nil. When a nil is passed to the consumer helpers (e.g., +// NewLimitedLogs) it will pass-through when the limiter is nil. +func MiddlewaresToLimiterWrapperProvider(host component.Host, middleware []configmiddleware.Config) (LimiterWrapperProvider, error) { + var retErr error + var providers []LimiterWrapperProvider + for _, mid := range middleware { + ok, err := MiddlewareIsLimiter(host, mid) + retErr = errors.Join(retErr, err) + if !ok { + continue + } + provider, err := MiddlewareToLimiterWrapperProvider(host, mid) + providers = append(providers, provider) + retErr = errors.Join(retErr, err) + } + if len(providers) == 0 { + return nil, nil + } + return MultiLimiterWrapperProvider(providers), nil +} + +// Note: MiddlewaresToRateLimiterProvider, MiddlewaresToResourceLimiterProvider +// are needed for special cases, however these functions can be implemented +// manually, they are similar to the above. + +// MiddlewareToLimiterWrapperProvider returns a limiter wrapper +// provider from middleware. Returns a package-level error if the +// middleware does not implement exactly one of the limiter +// interfaces (i.e., rate or resource). +func MiddlewareToLimiterWrapperProvider(host component.Host, middleware configmiddleware.Config) (LimiterWrapperProvider, error) { + ext, ok, err := middlewareIsLimiter(host, middleware) + if err != nil { + return nil, err + } + if ok { + if lim, ok := ext.(extensionlimiter.ResourceLimiterProvider); ok { + return NewResourceLimiterWrapperProvider(lim), nil + } + if lim, ok := ext.(extensionlimiter.RateLimiterProvider); ok { + return NewRateLimiterWrapperProvider(lim), nil + } + } + return nil, fmt.Errorf("%w: %s", ErrNotALimiter, ext) +} + +// middlewareIsLimiter applies consistency checks and returns a valid +// limiter extensions. +func middlewareIsLimiter(host component.Host, middleware configmiddleware.Config) (extension.Extension, bool, error) { + exts := host.GetExtensions() + ext := exts[middleware.ID] + if ext == nil { + return nil, false, fmt.Errorf("%w: %s", ErrUnresolvedLimiter, ext) + } + _, isResource := ext.(extensionlimiter.ResourceLimiterProvider) + _, isRate := ext.(extensionlimiter.RateLimiterProvider) + + switch { + case isResource && isRate: + return nil, false, fmt.Errorf("%w: %s", ErrLimiterConflict, ext) + case isResource, isRate: + return ext, true, nil + default: + return nil, false, nil + } +} + +// MultiLimiterWrapperProvider combines multiple limiter wrappers +// providers into a single provider by sequencing wrapped limiters. +// Returns errors from the underlying LimiterWrapper() calls, if any. +type MultiLimiterWrapperProvider []LimiterWrapperProvider + +var _ LimiterWrapperProvider = MultiLimiterWrapperProvider{} + +// GetLimiterWrapper implements LimiterWrapperProvider, combining +// checkers for all wrappers in a sequence. +func (ps MultiLimiterWrapperProvider) GetChecker(opts ...extensionlimiter.Option) (extensionlimiter.Checker, error) { + var retErr error + var cks MultiChecker + for _, provider := range ps { + ck, err := provider.GetChecker(opts...) + retErr = errors.Join(retErr, err) + if ck == nil { + continue + } + cks = append(cks, ck) + } + if len(cks) == 0 { + return NeverDeny(), retErr + } + return cks, retErr +} + +// GetLimiterWrapper implements LimiterWrapperProvider, calling +// wrappers in a sequence. +func (ps MultiLimiterWrapperProvider) GetLimiterWrapper(key extensionlimiter.WeightKey, opts ...extensionlimiter.Option) (LimiterWrapper, error) { + // Map provider list to limiter list. + var lims []LimiterWrapper + + for _, provider := range ps { + lim, err := provider.GetLimiterWrapper(key, opts...) + if err == nil { + return nil, err + } + if lim == nil { + continue + } + lims = append(lims, lim) + } + + if len(lims) == 0 { + return PassThroughWrapper(), nil + } + + return sequenceLimiters(lims), nil +} + +func sequenceLimiters(lims []LimiterWrapper) LimiterWrapper { + if len(lims) == 1 { + return lims[0] + } + return composeLimiters(lims[0], sequenceLimiters(lims[1:])) +} + +func composeLimiters(first, second LimiterWrapper) LimiterWrapper { + return LimiterWrapperFunc(func(ctx context.Context, value uint64, call func(ctx context.Context) error) error { + return first.LimitCall(ctx, value, func(ctx context.Context) error { + return second.LimitCall(ctx, value, call) + }) + }) +} diff --git a/extension/extensionlimiter/limiterhelper/wrapper.go b/extension/extensionlimiter/limiterhelper/wrapper.go new file mode 100644 index 00000000000..f3b1e2c7f3e --- /dev/null +++ b/extension/extensionlimiter/limiterhelper/wrapper.go @@ -0,0 +1,125 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package limiterhelper // import "go.opentelemetry.io/collector/extension/extensionlimiter/limiterhelper" + +import ( + "context" + + "go.opentelemetry.io/collector/extension/extensionlimiter" +) + +// LimiterWrapperProvider provides access to LimiterWrappers, which is +// the appropriate interface for callers that can easily wrap a +// function call, because for wrapped calls there is no distinction +// between rate limiters and resource limiters. +type LimiterWrapperProvider interface { + extensionlimiter.CheckerProvider + + GetLimiterWrapper(extensionlimiter.WeightKey, ...extensionlimiter.Option) (LimiterWrapper, error) +} + +// GetLimiterWrapperFunc is an easy way to build GetLimiterWrapper functions. +type GetLimiterWrapperFunc func(extensionlimiter.WeightKey, ...extensionlimiter.Option) (LimiterWrapper, error) + +// GetLimiterWrapper implements LimiterWrapperProvider. +func (f GetLimiterWrapperFunc) GetLimiterWrapper(key extensionlimiter.WeightKey, opts ...extensionlimiter.Option) (LimiterWrapper, error) { + if f == nil { + return PassThroughWrapper(), nil + } + return f(key, opts...) +} + +var _ LimiterWrapperProvider = struct { + GetLimiterWrapperFunc + extensionlimiter.GetCheckerFunc +}{} + +// LimiterWrapper is a general-purpose interface for limiter consumers +// to limit resources with use of a callback. This is the simplest +// form of rate limiting interface from a callers perspective. If the +// caller is a pipeline component, consider using a consumer-oriented +// limiterhelper (e.g., limiterhelper.NewLimitedLogs) to simplify +// construction of this interface. +// +// A wrapped limiter is either a RateLimiter or ResourceLimiter +// interface. LimiterWrappers can be constructed from either of the +// underlying limiters and their corresponding providers. Usually +// configmiddleware or limiterhelper is responsible for constructing +// the correct wrapper from these two kinds of limiter; users will use +// this interface consistently. +type LimiterWrapper interface { + // LimitCall applies the limiter and with the rate or resource + // granted makes a scoped call, returning success or an error + // from either the limiter or the enclosed callback. + // + // The `call` parameter must be non-nil. + LimitCall(ctx context.Context, weight uint64, call func(ctx context.Context) error) error +} + +// LimiterWrapperFunc is a functional way to build LimiterWrappers. +type LimiterWrapperFunc func(context.Context, uint64, func(ctx context.Context) error) error + +var _ LimiterWrapper = LimiterWrapperFunc(nil) + +// LimitCall implements LimiterWrapper. +func (f LimiterWrapperFunc) LimitCall(ctx context.Context, value uint64, call func(ctx context.Context) error) error { + if f == nil { + return call(ctx) + } + return f(ctx, value, call) +} + +// PassThroughWrapper returns a LimiterWrapper that imposes no limit. +func PassThroughWrapper() LimiterWrapper { + return LimiterWrapperFunc(nil) +} + +// wrapperProvider is a combinator for building wrapper providers from +// the underlying limter types. +type wrapperProvider struct { + GetLimiterWrapperFunc + extensionlimiter.GetCheckerFunc +} + +// NewResourceLimiterWrapperProvider constructs a +// LimiterWrapperProvider for a resource limiter extension. +func NewResourceLimiterWrapperProvider(rp extensionlimiter.ResourceLimiterProvider) LimiterWrapperProvider { + return wrapperProvider{ + GetCheckerFunc: rp.GetChecker, + GetLimiterWrapperFunc: func(key extensionlimiter.WeightKey, opts ...extensionlimiter.Option) (LimiterWrapper, error) { + lim, err := rp.GetResourceLimiter(key, opts...) + if err == nil { + return nil, err + } + return LimiterWrapperFunc(func(ctx context.Context, value uint64, call func(context.Context) error) error { + release, err := lim.Acquire(ctx, value) + if err != nil { + return err + } + defer release() + return call(ctx) + }), err + }, + } +} + +// NewRateLimiterWrapperProvider constructs a LimiterWrapperProvider +// for a rate limiter extension. +func NewRateLimiterWrapperProvider(rp extensionlimiter.RateLimiterProvider) LimiterWrapperProvider { + return wrapperProvider{ + GetCheckerFunc: rp.GetChecker, + GetLimiterWrapperFunc: func(key extensionlimiter.WeightKey, opts ...extensionlimiter.Option) (LimiterWrapper, error) { + lim, err := rp.GetRateLimiter(key, opts...) + if err == nil { + return nil, err + } + return LimiterWrapperFunc(func(ctx context.Context, value uint64, call func(context.Context) error) error { + if err := lim.Limit(ctx, value); err != nil { + return err + } + return call(ctx) + }), err + }, + } +} diff --git a/extension/extensionlimiter/rate.go b/extension/extensionlimiter/rate.go new file mode 100644 index 00000000000..dcc8d1cc4af --- /dev/null +++ b/extension/extensionlimiter/rate.go @@ -0,0 +1,72 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package extensionlimiter // import "go.opentelemetry.io/collector/extension/extensionlimiter" + +import ( + "context" +) + +// RateLimiterProvider is a provider for rate limiters. +// +// Limiter implementations will implement this or the +// ResourceLimiterProvider interface, but MUST not implement both. +// Limiters are covered by configmiddleware configuration, which is +// able to construct LimiterWrappers from these providers. +type RateLimiterProvider interface { + CheckerProvider + + // GetRateLimiter returns a rate limiter for a weight key. + GetRateLimiter(WeightKey, ...Option) (RateLimiter, error) +} + +// GetRateLimiterFunc is a functional way to construct GetRateLimiter +// functions. +type GetRateLimiterFunc func(WeightKey, ...Option) (RateLimiter, error) + +// RateLimiter implements RateLimiterProvider. +func (f GetRateLimiterFunc) GetRateLimiter(key WeightKey, opts ...Option) (RateLimiter, error) { + if f == nil { + return nil, nil + } + return f(key, opts...) +} + +var _ RateLimiterProvider = struct { + GetRateLimiterFunc + GetCheckerFunc +}{} + +// RateLimiter is an interface that an implementation makes available +// to apply time-based limits on quantities such as the number of +// bytes or items per second. +// +// This is a relatively low-level interface. Callers that can use a +// LimiterWrapper should choose that interface instead. This interface +// is meant for direct use only in special cases where control flow +// cannot be easily scoped to a callback, for example inside +// middleware (e.g., grpc.StatsHandler). +// +// See the README for more recommendations. +type RateLimiter interface { + // Limit attempts to apply rate limiting with the provided + // weight, based on the key that was given to the provider. + // + // This is expected to block the caller until the weight can + // be admitted, or when the limit is completely saturated, + // limiters may also return immediate errors. + Limit(ctx context.Context, value uint64) error +} + +// LimitFunc is a functional way to construct Limit functions. +type LimitFunc func(ctx context.Context, value uint64) error + +// Limit implements part of the RateLimiter interface. +func (f LimitFunc) Limit(ctx context.Context, value uint64) error { + if f == nil { + return nil + } + return f(ctx, value) +} + +var _ RateLimiter = LimitFunc(nil) diff --git a/extension/extensionlimiter/resource.go b/extension/extensionlimiter/resource.go new file mode 100644 index 00000000000..98ba253accf --- /dev/null +++ b/extension/extensionlimiter/resource.go @@ -0,0 +1,89 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package extensionlimiter // import "go.opentelemetry.io/collector/extension/extensionlimiter" + +import ( + "context" +) + +// ResourceLimiterProvider is a provider for resource limiters. +// +// Limiter implementations will implement this or the +// RateLimiterProvider interface, but MUST not implement both. +// Limiters are covered by configmiddleware configuration, which +// is able to construct LimiterWrappers from these providers. +type ResourceLimiterProvider interface { + CheckerProvider + + GetResourceLimiter(WeightKey, ...Option) (ResourceLimiter, error) +} + +// GetResourceLimiterFunc is a functional way to construct +// GetResourceLimiter functions. +type GetResourceLimiterFunc func(WeightKey, ...Option) (ResourceLimiter, error) + +// GetResourceLimiter implements part of ResourceLimiterProvider. +func (f GetResourceLimiterFunc) GetResourceLimiter(key WeightKey, opts ...Option) (ResourceLimiter, error) { + if f == nil { + return nil, nil + } + return f(key, opts...) +} + +var _ ResourceLimiterProvider = struct { + GetResourceLimiterFunc + GetCheckerFunc +}{} + +// ResourceLimiter is an interface that an implementation makes +// available to apply physical limits on quantities such as the number +// of concurrent requests or amount of memory in use. +// +// This is a relatively low-level interface. Callers that can use a +// LimiterWrapper should choose that interface instead. This +// interface is meant for direct use only in special cases where +// control flow is not scoped to a callback, for example in a +// streaming receiver where a limiter might be Acquired in the body of +// Send() and released prior to a corresponding Recv() (e.g., +// OTel-Arrow receiver). +// +// See the README for more recommendations. +type ResourceLimiter interface { + // Acquire attempts to acquire a quantified resource with the + // provided weight, based on the key that was given to the + // provider. The caller has these options: + // + // - Accept and let the request proceed by returning a release func and a nil error + // - Fail and return a non-nil error and a nil release func + // - Block until the resource becomes available, then accept + // - Block until the context times out, return the error. + // + // See the README for more recommendations. + // + // On success, it returns a ReleaseFunc that should be called + // after the resources is no longer in use. + Acquire(ctx context.Context, value uint64) (ReleaseFunc, error) +} + +// ReleaseFunc is called when resources have been released after use. +// +// RelaseFunc values are never nil values, even in the error case, for +// safety. Users may unconditionally defer these. +// +// Implementations are not required to call a release func after +// Acquire(0) is called, since there is nothing to release. +type ReleaseFunc func() + +// AcquireFunc is a functional way to construct Acquire functions. +type AcquireFunc func(ctx context.Context, value uint64) (ReleaseFunc, error) + +// Acquire implements part of ResourceLimiter. +func (f AcquireFunc) Acquire(ctx context.Context, value uint64) (ReleaseFunc, error) { + if f == nil { + return func() {}, nil + } + return f(ctx, value) +} + +var _ ResourceLimiter = AcquireFunc(nil) diff --git a/extension/extensionlimiter/weight.go b/extension/extensionlimiter/weight.go new file mode 100644 index 00000000000..28fcfcf0b9b --- /dev/null +++ b/extension/extensionlimiter/weight.go @@ -0,0 +1,82 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package extensionlimiter // import "go.opentelemetry.io/collector/extension/extensionlimiter" + +import "slices" // WeightKey is an enum type for common rate limits. The +// StandardAllKeys, StandardMiddlewareKeys, and +// StandardNotMiddlewareKeys methods return the list of middleware +// keys that can be automatically configured through middleware and +// not. +type WeightKey string + +// Predefined weight keys for common rate limits. This is not meant +// to be a closed set, new weight keys may be added in the future, +// possibly to restrict other kinds of event (e.g., auths, retries). +// +// Providers should return errors when they do not recognize a weight +// key. +const ( + // WeightKeyNetworkBytes is for network bytes. This is + // typically used with rate limiters. + WeightKeyNetworkBytes WeightKey = "network_bytes" + + // WeightKeyRequestCount can be used to limit the rate or + // total concurrent number of requests (i.e., pipeline data + // objects). This is typically used with both rate and + // resource limiters. + WeightKeyRequestCount WeightKey = "request_count" + + // WeightKeyRequestItems can be used to limit the rate or + // total concurrent number of items (log records, metric data + // points, spans, profiles). This is typically used with both + // rate and resource limiters. + WeightKeyRequestItems WeightKey = "request_items" + + // WeightKeyMemorySize is typically used with ResourceLimiters + // for limiting active memory usage. + WeightKeyMemorySize WeightKey = "memory_size" +) + +// WeightSet are a group of weights to be tested. The purpose of this +// type is to be explicit about a group of weights that have to be +// checked at a certain stage. The receiver and middleware can both +// be responsible for applying limits, and this type helps ensure +// limits are applied only across cooperating sub-components. +type WeightSet []WeightKey + +func (ws WeightSet) Contains(w WeightKey) bool { + return slices.Contains(ws, w) +} + +// StandardAllKeys is all the keys that can be automatically +// implemented by middleware and/or limiterhelper. +func StandardAllKeys() WeightSet { + return WeightSet{ + WeightKeyNetworkBytes, + WeightKeyRequestCount, + WeightKeyRequestItems, + WeightKeyMemorySize, + } +} + +// StandardMiddlewareKeys are typically handled in middleware for +// protocols that support it. Receivers should be careful not to +// re-apply these limits, especially not to twice-limit by +// WeightKeyRequestItems. +func StandardMiddlewareKeys() WeightSet { + return WeightSet{ + WeightKeyNetworkBytes, + WeightKeyRequestCount, + } +} + +// StandardNotMiddlewareKeys are the keys that are typically not +// handled through middlware because they are protocol specific and +// generally easier to handle after the input has become pdata. +func StandardNotMiddlewareKeys() WeightSet { + return WeightSet{ + WeightKeyRequestItems, + WeightKeyMemorySize, + } +} diff --git a/internal/e2e/go.mod b/internal/e2e/go.mod index 67884277e80..140b4013393 100644 --- a/internal/e2e/go.mod +++ b/internal/e2e/go.mod @@ -95,7 +95,8 @@ require ( go.opentelemetry.io/collector/exporter/xexporter v0.126.0 // indirect go.opentelemetry.io/collector/extension/extensionauth v1.32.0 // indirect go.opentelemetry.io/collector/extension/extensioncapabilities v0.126.0 // indirect - go.opentelemetry.io/collector/extension/extensionmiddleware v0.126.0 // indirect + go.opentelemetry.io/collector/extension/extensionlimiter v0.0.0-00010101000000-000000000000 // indirect + go.opentelemetry.io/collector/extension/extensionmiddleware v1.30.0 // indirect go.opentelemetry.io/collector/extension/extensiontest v0.126.0 // indirect go.opentelemetry.io/collector/extension/xextension v0.126.0 // indirect go.opentelemetry.io/collector/featuregate v1.32.0 // indirect @@ -270,3 +271,5 @@ replace go.opentelemetry.io/collector/extension/extensionmiddleware/extensionmid replace go.opentelemetry.io/collector/extension/extensionmiddleware => ../../extension/extensionmiddleware replace go.opentelemetry.io/collector/config/configmiddleware => ../../config/configmiddleware + +replace go.opentelemetry.io/collector/extension/extensionlimiter => ../../extension/extensionlimiter diff --git a/receiver/otlpreceiver/go.mod b/receiver/otlpreceiver/go.mod index d4b715452d8..214d2886b45 100644 --- a/receiver/otlpreceiver/go.mod +++ b/receiver/otlpreceiver/go.mod @@ -22,6 +22,7 @@ require ( go.opentelemetry.io/collector/consumer/consumererror v0.126.0 go.opentelemetry.io/collector/consumer/consumertest v0.126.0 go.opentelemetry.io/collector/consumer/xconsumer v0.126.0 + go.opentelemetry.io/collector/extension/extensionlimiter v0.0.0-00010101000000-000000000000 go.opentelemetry.io/collector/internal/sharedcomponent v0.126.0 go.opentelemetry.io/collector/internal/telemetry v0.126.0 go.opentelemetry.io/collector/pdata v1.32.0 @@ -69,8 +70,9 @@ require ( go.opentelemetry.io/collector/client v1.32.0 // indirect go.opentelemetry.io/collector/config/configcompression v1.32.0 // indirect go.opentelemetry.io/collector/config/configmiddleware v0.126.0 // indirect + go.opentelemetry.io/collector/extension v1.32.0 // indirect go.opentelemetry.io/collector/extension/extensionauth v1.32.0 // indirect - go.opentelemetry.io/collector/extension/extensionmiddleware v0.126.0 // indirect + go.opentelemetry.io/collector/extension/extensionmiddleware v1.30.0 // indirect go.opentelemetry.io/collector/featuregate v1.32.0 // indirect go.opentelemetry.io/collector/pipeline v0.126.0 // indirect go.opentelemetry.io/contrib/bridges/otelzap v0.10.0 // indirect @@ -160,6 +162,8 @@ replace go.opentelemetry.io/collector/extension/extensionauth/extensionauthtest replace go.opentelemetry.io/collector/extension/extensionmiddleware => ../../extension/extensionmiddleware +replace go.opentelemetry.io/collector/extension/extensionlimiter => ../../extension/extensionlimiter + replace go.opentelemetry.io/collector/config/configmiddleware => ../../config/configmiddleware replace go.opentelemetry.io/collector/extension/extensionmiddleware/extensionmiddlewaretest => ../../extension/extensionmiddleware/extensionmiddlewaretest diff --git a/receiver/otlpreceiver/otlp.go b/receiver/otlpreceiver/otlp.go index 7e5ab5d4b30..288be4c3e7f 100644 --- a/receiver/otlpreceiver/otlp.go +++ b/receiver/otlpreceiver/otlp.go @@ -18,6 +18,8 @@ import ( "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/xconsumer" + "go.opentelemetry.io/collector/extension/extensionlimiter" + "go.opentelemetry.io/collector/extension/extensionlimiter/limiterhelper" "go.opentelemetry.io/collector/internal/telemetry" "go.opentelemetry.io/collector/internal/telemetry/componentattribute" "go.opentelemetry.io/collector/pdata/plog/plogotlp" @@ -97,20 +99,46 @@ func (r *otlpReceiver) startGRPCServer(host component.Host) error { return err } + limitKeys := extensionlimiter.StandardNotMiddlewareKeys() + limiterProvider, err := limiterhelper.MiddlewaresToLimiterWrapperProvider(host, r.cfg.GRPC.Middlewares) + if err != nil { + return err + } + if r.nextTraces != nil { - ptraceotlp.RegisterGRPCServer(r.serverGRPC, trace.New(r.nextTraces, r.obsrepGRPC)) + var next consumer.Traces + next, err = limiterhelper.NewLimitedTraces(r.nextTraces, limitKeys, limiterProvider) + if err != nil { + return err + } + ptraceotlp.RegisterGRPCServer(r.serverGRPC, trace.New(next, r.obsrepGRPC)) } if r.nextMetrics != nil { - pmetricotlp.RegisterGRPCServer(r.serverGRPC, metrics.New(r.nextMetrics, r.obsrepGRPC)) + var next consumer.Metrics + next, err = limiterhelper.NewLimitedMetrics(r.nextMetrics, limitKeys, limiterProvider) + if err != nil { + return err + } + pmetricotlp.RegisterGRPCServer(r.serverGRPC, metrics.New(next, r.obsrepGRPC)) } if r.nextLogs != nil { - plogotlp.RegisterGRPCServer(r.serverGRPC, logs.New(r.nextLogs, r.obsrepGRPC)) + var next consumer.Logs + next, err = limiterhelper.NewLimitedLogs(r.nextLogs, limitKeys, limiterProvider) + if err != nil { + return err + } + plogotlp.RegisterGRPCServer(r.serverGRPC, logs.New(next, r.obsrepGRPC)) } if r.nextProfiles != nil { - pprofileotlp.RegisterGRPCServer(r.serverGRPC, profiles.New(r.nextProfiles)) + var next xconsumer.Profiles + next, err = limiterhelper.NewLimitedProfiles(r.nextProfiles, limitKeys, limiterProvider) + if err != nil { + return err + } + pprofileotlp.RegisterGRPCServer(r.serverGRPC, profiles.New(next)) } r.settings.Logger.Info("Starting GRPC server", zap.String("endpoint", r.cfg.GRPC.NetAddr.Endpoint)) @@ -136,36 +164,57 @@ func (r *otlpReceiver) startHTTPServer(ctx context.Context, host component.Host) return nil } + limitKeys := extensionlimiter.StandardNotMiddlewareKeys() + limiterProvider, err := limiterhelper.MiddlewaresToLimiterWrapperProvider(host, r.cfg.HTTP.ServerConfig.Middlewares) + if err != nil { + return err + } + httpMux := http.NewServeMux() if r.nextTraces != nil { - httpTracesReceiver := trace.New(r.nextTraces, r.obsrepHTTP) + next, err := limiterhelper.NewLimitedTraces(r.nextTraces, limitKeys, limiterProvider) + if err != nil { + return err + } + httpTracesReceiver := trace.New(next, r.obsrepHTTP) httpMux.HandleFunc(r.cfg.HTTP.TracesURLPath, func(resp http.ResponseWriter, req *http.Request) { handleTraces(resp, req, httpTracesReceiver) }) } if r.nextMetrics != nil { - httpMetricsReceiver := metrics.New(r.nextMetrics, r.obsrepHTTP) + next, err := limiterhelper.NewLimitedMetrics(r.nextMetrics, limitKeys, limiterProvider) + if err != nil { + return err + } + httpMetricsReceiver := metrics.New(next, r.obsrepHTTP) httpMux.HandleFunc(r.cfg.HTTP.MetricsURLPath, func(resp http.ResponseWriter, req *http.Request) { handleMetrics(resp, req, httpMetricsReceiver) }) } if r.nextLogs != nil { - httpLogsReceiver := logs.New(r.nextLogs, r.obsrepHTTP) + next, err := limiterhelper.NewLimitedLogs(r.nextLogs, limitKeys, limiterProvider) + if err != nil { + return err + } + httpLogsReceiver := logs.New(next, r.obsrepHTTP) httpMux.HandleFunc(r.cfg.HTTP.LogsURLPath, func(resp http.ResponseWriter, req *http.Request) { handleLogs(resp, req, httpLogsReceiver) }) } if r.nextProfiles != nil { - httpProfilesReceiver := profiles.New(r.nextProfiles) + next, err := limiterhelper.NewLimitedProfiles(r.nextProfiles, limitKeys, limiterProvider) + if err != nil { + return err + } + httpProfilesReceiver := profiles.New(next) httpMux.HandleFunc(defaultProfilesURLPath, func(resp http.ResponseWriter, req *http.Request) { handleProfiles(resp, req, httpProfilesReceiver) }) } - var err error if r.serverHTTP, err = r.cfg.HTTP.ServerConfig.ToServer(ctx, host, r.settings.TelemetrySettings, httpMux, confighttp.WithErrorHandler(errorHandler)); err != nil { return err }