diff --git a/helper/streams/streams.go b/helper/streams/streams.go index 033cd980..d61ae787 100644 --- a/helper/streams/streams.go +++ b/helper/streams/streams.go @@ -68,9 +68,9 @@ func Concurrent[T any](c chan T) func(context.Context, ...T) error { // NewConcurrentContext does the same as NewConcurrent, but uses the provided // Context for every push call. -func NewConcurrentContext[T any](ctx context.Context, vals ...T) (_ <-chan T, _push func(context.Context, ...T) error, _close func()) { +func NewConcurrentContext[T any](ctx context.Context, vals ...T) (_ <-chan T, _push func(...T) error, _close func()) { out, push, cls := NewConcurrent(vals...) - return out, func(_ context.Context, vals ...T) error { + return out, func(vals ...T) error { return push(ctx, vals...) }, cls }