Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions extension/memorylimiterextension/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ require (
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/collector/extension/xextension v0.121.0 // indirect
go.opentelemetry.io/collector/featuregate v1.27.0 // indirect
go.opentelemetry.io/collector/pdata v1.27.0 // indirect
go.opentelemetry.io/otel v1.34.0 // indirect
Expand Down Expand Up @@ -68,4 +69,6 @@ replace go.opentelemetry.io/collector/internal/memorylimiter => ../../internal/m

replace go.opentelemetry.io/collector/extension/extensiontest => ../../extension/extensiontest

replace go.opentelemetry.io/collector/extension/xextension => ../../extension/xextension

replace go.opentelemetry.io/collector/featuregate => ../../featuregate
25 changes: 25 additions & 0 deletions extension/memorylimiterextension/memorylimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@ import (
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension/xextension/limit"
"go.opentelemetry.io/collector/internal/memorylimiter"
)

type memoryLimiterExtension struct {
memLimiter *memorylimiter.MemoryLimiter
}

var _ limit.Extension = &memoryLimiterExtension{}

// newMemoryLimiter returns a new memorylimiter extension.
func newMemoryLimiter(cfg *Config, logger *zap.Logger) (*memoryLimiterExtension, error) {
ml, err := memorylimiter.NewMemoryLimiter(cfg, logger)
Expand All @@ -35,6 +38,28 @@ func (ml *memoryLimiterExtension) Shutdown(ctx context.Context) error {
}

// MustRefuse returns if the caller should deny because memory has reached it's configured limits
//
// It's not clear that this is used anywhere, but as a legacy exported
// function some component could observe it disappear, so it has to stay.
func (ml *memoryLimiterExtension) MustRefuse() bool {
return ml.memLimiter.MustRefuse()
}

// client implements Client. Note that the component Kind and ID are
// not used, so this struct is identical to *memoryLimiterExtension.
type client struct {
ml *memoryLimiterExtension
}

// GetClient implements limit.Extension.
func (ml *memoryLimiterExtension) GetClient(_ context.Context, _ component.Kind, _ component.ID) (limit.Client, error) {
return &client{ml}, nil
}

// Acquire implements limit.Client.
func (c *client) Acquire(_ context.Context, _ uint64) (limit.ReleaseFunc, error) {
if c.ml.memLimiter.MustRefuse() {
return nil, memorylimiter.ErrDataRefused
}
return func() {}, nil
}
16 changes: 16 additions & 0 deletions extension/memorylimiterextension/memorylimiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/internal/memorylimiter"
"go.opentelemetry.io/collector/internal/memorylimiter/iruntime"
Expand Down Expand Up @@ -91,6 +92,21 @@ func TestMemoryPressureResponse(t *testing.T) {
} else {
require.NoError(t, err)
}

// Use the limit extension API.
lclient, err := ml.GetClient(ctx, component.KindReceiver, component.MustNewID("testing"))
assert.NoError(t, err)
nf, err := lclient.Acquire(ctx, 1)

if tt.expectError {
assert.Error(t, err)
assert.Nil(t, nf)
} else {
assert.NoError(t, err)
assert.NotNil(t, nf)
nf()
}

assert.NoError(t, ml.Shutdown(ctx))
})
}
Expand Down
4 changes: 4 additions & 0 deletions extension/xextension/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ module go.opentelemetry.io/collector/extension/xextension
go 1.23.0

require (
github.com/stretchr/testify v1.10.0
go.opentelemetry.io/collector/component v1.27.0
go.opentelemetry.io/collector/extension v1.27.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/collector/pdata v1.27.0 // indirect
go.opentelemetry.io/otel v1.34.0 // indirect
go.opentelemetry.io/otel/metric v1.34.0 // indirect
Expand All @@ -21,6 +24,7 @@ require (
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f // indirect
google.golang.org/grpc v1.71.0 // indirect
google.golang.org/protobuf v1.36.5 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace go.opentelemetry.io/collector/extension => ../
Expand Down
9 changes: 9 additions & 0 deletions extension/xextension/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 48 additions & 0 deletions extension/xextension/limit/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Storage

**Status: under development**

A limit extension supports flexible methods for admission and rate
control. Other components, typically receivers, can request a limit
client from the limit extension and use it to admit requests.

This interface is byte-weight oriented, enabling limiting for
individual payloads within a stream-oriented RPC. For request-level
limiting, consider using the Auth extension instead.

The `limit.Extension` interface extends `component.Extension` by
adding the following method:

```
GetClient(context.Context, component.Kind, component.ID, string) (Client, error)
```

After the context argument are two component-level identifiers and a
string, which is the name of the extension. Typically, components
will support a list of named limiters to apply in sequence, e.g.,

```
receivers:
someprotocol:
limiters:
- rate
- memory
```

The `limit.Client` interface contains the following method:
```
Acquire(ctx context.Context, weight uint64) (ReleaseFunc, error)
```

The weight parameter specifies how large of an request to consider in
the limiter, which should estimate the amount of real memory used by
to represent the data after it is uncompressed.

The result is either a non-nil release function and nil error, or a
nil release function and non-nil error. The limiter may block or fail
fast, at its discretion. When a non-nil release function is returned,
the component is responsible for calling the release function after
the memory is no longer used.

Note: It is the responsibility of each component to `Close` a storage
client that it has requested.
6 changes: 6 additions & 0 deletions extension/xextension/limit/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Package limit implements an extension that can limit requests by
// blocking or failing them through calls to acquire and release.
package limit // import "go.opentelemetry.io/collector/extension/xextension/limit"
70 changes: 70 additions & 0 deletions extension/xextension/limit/limit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package limit // import "go.opentelemetry.io/collector/extension/xextension/limit"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on auth case, I think this should be consistent and be named extensionlimit


import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension"
)

// Extension is the interface that storage extensions must implement
type Extension interface {
extension.Extension

// GetClient will create a client for use by the specified
// component. The component can use the client to limit
// admission to the pipeline.
GetClient(ctx context.Context, kind component.Kind, id component.ID) (Client, error)
}
Comment on lines +14 to +21
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some questions/asks here:

  1. Need to be consistent with decision in [extensionauth] Split extensionauth.Client by protocol type #12574, and not include the extension.Extension into this.
  2. Do we need the "extra" client interface? Asking because we don't follow this pattern in extensionauth but we do this in storage extension. If we need that (there may be good reasons to have it), I would like to make the auth follow the same pattern. cc @mx-psi

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes more sense to me, without further context, to use the approach on my PR: if multiple instances of the client are required then the user can define multiple extensions (limiter/1, limiter/2...)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't mean to be inconsistent with #12574, sorry. That approach looks fine to me.

As noted, the "extra" interface appears valuable to me in the sense that static context (e.g., component metadata) can be used to specialize the limiting conditions at start time similar to x/storage. See my comment in #12603 (comment) on this topic. Today's configgrpc and confighttp helpers do not include the component metadata, so to be realistic I removed the extra parameters. If we remove the extra parameters and do not arrange to pass component metadata (e.g., signal kind, component ID) via context, then there is no reason for the extra interface.

See I opened several drafts--the copy you are commenting on, here, is an older iteration of mine, where I tried to follow the x/storage model before I realized it would not work. In #12600 the interface became simply GetLimiter(ctx), see https://github.com/open-telemetry/opentelemetry-collector/pull/12600/files#diff-8be8b8c994bb03af15a8c1a7cc8953d44053c7b40726d0ec99f365ca29eee608R19.


// Client implements a limiter for byte-weighted request admission.
type Client interface {
// Acquire asks the controller to admit the caller.
//
// The weight parameter specifies how large of an request to
// consider in the limiter. This should be a measure of the
// size of the request after it is uncompressed. One way to
// compute the weight of a request is byte its OTLP encoding
// size using the appropriate pdata `Sizer` type. However,
// components are encouraged to use an approximation, not
// necessarily based on the OTLP encoding size, if there is an
// efficient substitute.
//
// The goal is for the estimated weight to approximate the
// amount of real memory that will be occupied while the
// request is in flight. If the component uses a protocol
// with less repetition than OTLP, it is possible for the
// Sizer to overestimate the amount of real memory used,
// because Golang's immutable string value allows
// it. Therefore, components should prefer an inexpensive,
// approximate method to determine weight in bytes.
//
// The limiter is permitted to block the request. After making
// its decision, the return value is exclusively a function
// value or an error. Admit will return when one of the
// following events occurs:
//
// (1) admission is allowed, or
// (2) the provided ctx becomes canceled, or
// (3) the limiter determines that the request should fail.
Comment on lines +51 to +52
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, in this case returned err is not nil.

//
// In case (1), the return value will be a non-nil
// ReleaseFunc. The caller must invoke it after it is finished
// with the resource being guarded by the admission
// controller.
//
// In case (2), the return value should be similar to:
// - gRPC: Cancelled or DeadlineExceeded error
// - HTTP: Status 408.
//
// In case (3), the return value should be similar to:
// - gRPC: ResourceExhausted error
// - HTTP: Status 429.
Acquire(ctx context.Context, weight uint64) (ReleaseFunc, error)
}

// ReleaseFunc is returned by Acquire when the Acquire() was admitted.
type ReleaseFunc func()
19 changes: 19 additions & 0 deletions extension/xextension/limit/limit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package limit

import (
"context"
"testing"

"github.com/stretchr/testify/require"
)

func TestNopLimiter(t *testing.T) {
ctx := context.Background()
nop := NewNopClient()
rel, err := nop.Acquire(ctx, 1000)
require.NoError(t, err)
rel()
}
10 changes: 10 additions & 0 deletions extension/xextension/limit/metadata.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
type: xextension
github_project: open-telemetry/opentelemetry-collector

status:
class: extension
codeowners:
active:
- jmacd
stability:
development: [traces, metrcs, logs, profiles]
20 changes: 20 additions & 0 deletions extension/xextension/limit/nop_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package limit // import "go.opentelemetry.io/collector/extension/xextension/limit"

import "context"

type nopClient struct{}

var nopClientInstance Client = &nopClient{}

// NewNopClient returns a nop client
func NewNopClient() Client {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We usually have this in a limittest package/module. Please follow that model.

return nopClientInstance
}

// Acquire implements Client.
func (nopClient) Acquire(_ context.Context, _ uint64) (ReleaseFunc, error) {
return func() {}, nil
}