Skip to content

Commit

Permalink
feat: improve async tee request processing
Browse files Browse the repository at this point in the history
  • Loading branch information
trevorwhitney committed Aug 2, 2024
1 parent 3dcc50d commit 2f5485d
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 26 deletions.
2 changes: 2 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,8 @@ func (d *Distributor) running(ctx context.Context) error {
}

func (d *Distributor) stopping(_ error) error {
d.tee.Stop()

return services.StopManagerAndAwaitStopped(context.Background(), d.subservices)
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/distributor/tee.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package distributor
// Tee implementations can duplicate the log streams to another endpoint.
type Tee interface {
Duplicate(tenant string, streams []KeyedStream)
Stop()
}

// WrapTee wraps a new Tee around an existing Tee.
Expand All @@ -25,3 +26,9 @@ func (m *multiTee) Duplicate(tenant string, streams []KeyedStream) {
tee.Duplicate(tenant, streams)
}
}

func (m *multiTee) Stop() {
for _, tee := range m.tees {
tee.Stop()
}
}
4 changes: 4 additions & 0 deletions pkg/ingester-rf1/tee.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,7 @@ func (t *Tee) sendStream(tenant string, stream distributor.KeyedStream) error {
t.ingesterAppends.WithLabelValues(addr, "success").Inc()
return nil
}

func (t *Tee) Stop() {
// noop
}
4 changes: 0 additions & 4 deletions pkg/pattern/aggregation/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ type Push struct {

// shutdown channels
quit chan struct{}
done chan struct{}

// auth
username, password string
Expand Down Expand Up @@ -135,7 +134,6 @@ func NewPush(
password: password,
logger: logger,
quit: make(chan struct{}),
done: make(chan struct{}),
backoff: backoffCfg,
entries: entries{
entries: make([]entry, 0),
Expand All @@ -155,7 +153,6 @@ func (p *Push) WriteEntry(ts time.Time, e string, lbls labels.Labels) {
func (p *Push) Stop() {
if p.quit != nil {
close(p.quit)
<-p.done
p.quit = nil
}
}
Expand Down Expand Up @@ -215,7 +212,6 @@ func (p *Push) run(pushPeriod time.Duration) {

defer func() {
pushTicker.Stop()
close(p.done)
}()

for {
Expand Down
49 changes: 44 additions & 5 deletions pkg/pattern/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type Config struct {
MaxClusters int `yaml:"max_clusters,omitempty" doc:"description=The maximum number of detected pattern clusters that can be created by streams."`
MaxEvictionRatio float64 `yaml:"max_eviction_ratio,omitempty" doc:"description=The maximum eviction ratio of patterns per stream. Once that ratio is reached, the stream will throttled pattern detection."`
MetricAggregation aggregation.Config `yaml:"metric_aggregation,omitempty" doc:"description=Configures the metric aggregation and storage behavior of the pattern ingester."`
TeeParallelism int `yaml:"tee_parallelism,omitempty" doc:"description=The number of parallel goroutines to use for forwarding requests to the pattern ingester."`
TeeBufferSize int `yaml:"tee_buffer_size,omitempty" doc:"Maxiumum number of pending teed request to pattern ingesters. If the buffer is full the request is dropped."`

// For testing.
factory ring_client.PoolFactory `yaml:"-"`
Expand All @@ -51,11 +53,48 @@ func (cfg *Config) RegisterFlags(fs *flag.FlagSet) {
cfg.LifecyclerConfig.RegisterFlagsWithPrefix("pattern-ingester.", fs, util_log.Logger)
cfg.ClientConfig.RegisterFlags(fs)
cfg.MetricAggregation.RegisterFlagsWithPrefix(fs, "pattern-ingester.")
fs.BoolVar(&cfg.Enabled, "pattern-ingester.enabled", false, "Flag to enable or disable the usage of the pattern-ingester component.")
fs.IntVar(&cfg.ConcurrentFlushes, "pattern-ingester.concurrent-flushes", 32, "How many flushes can happen concurrently from each stream.")
fs.DurationVar(&cfg.FlushCheckPeriod, "pattern-ingester.flush-check-period", 1*time.Minute, "How often should the ingester see if there are any blocks to flush. The first flush check is delayed by a random time up to 0.8x the flush check period. Additionally, there is +/- 1% jitter added to the interval.")
fs.IntVar(&cfg.MaxClusters, "pattern-ingester.max-clusters", drain.DefaultConfig().MaxClusters, "The maximum number of detected pattern clusters that can be created by the pattern ingester.")
fs.Float64Var(&cfg.MaxEvictionRatio, "pattern-ingester.max-eviction-ratio", drain.DefaultConfig().MaxEvictionRatio, "The maximum eviction ratio of patterns per stream. Once that ratio is reached, the stream will be throttled for pattern detection.")
fs.BoolVar(
&cfg.Enabled,
"pattern-ingester.enabled",
false,
"Flag to enable or disable the usage of the pattern-ingester component.",
)
fs.IntVar(
&cfg.ConcurrentFlushes,
"pattern-ingester.concurrent-flushes",
32,
"How many flushes can happen concurrently from each stream.",
)
fs.DurationVar(
&cfg.FlushCheckPeriod,
"pattern-ingester.flush-check-period",
1*time.Minute,
"How often should the ingester see if there are any blocks to flush. The first flush check is delayed by a random time up to 0.8x the flush check period. Additionally, there is +/- 1% jitter added to the interval.",
)
fs.IntVar(
&cfg.MaxClusters,
"pattern-ingester.max-clusters",
drain.DefaultConfig().MaxClusters,
"The maximum number of detected pattern clusters that can be created by the pattern ingester.",
)
fs.Float64Var(
&cfg.MaxEvictionRatio,
"pattern-ingester.max-eviction-ratio",
drain.DefaultConfig().MaxEvictionRatio,
"The maximum eviction ratio of patterns per stream. Once that ratio is reached, the stream will be throttled for pattern detection.",
)
fs.IntVar(
&cfg.TeeParallelism,
"pattern-ingester.tee-parallelism",
5,
"The number of parallel goroutines to use for forwarding requests to the pattern ingester.",
)
fs.IntVar(
&cfg.TeeBufferSize,
"pattern-ingester.tee-buffer-size",
100,
"Maxiumum number of pending teed request to pattern ingesters. If the buffer is full the request is dropped.",
)
}

func (cfg *Config) Validate() error {
Expand Down
87 changes: 70 additions & 17 deletions pkg/pattern/tee.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,18 @@ type Tee struct {

ingesterAppends *prometheus.CounterVec
fallbackIngesterAppends *prometheus.CounterVec

teedRequests *prometheus.CounterVec

requestCh chan request

// shutdown channel
quit chan struct{}
}

type request struct {
tenant string
stream distributor.KeyedStream
}

func NewTee(
Expand All @@ -43,34 +55,53 @@ func NewTee(
Name: "pattern_ingester_fallback_appends_total",
Help: "The total number of batch appends sent to fallback pattern ingesters, for not owned streams.",
}, []string{"ingester", "status"}),
teedRequests: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Name: "pattern_ingester_teed_requests_total",
Help: "The total number of batch appends sent to fallback pattern ingesters, for not owned streams.",
}, []string{"tenant", "status"}),
cfg: cfg,
ringClient: ringClient,
requestCh: make(chan request, cfg.TeeBufferSize),
quit: make(chan struct{}),
}

for i := 0; i < cfg.TeeParallelism; i++ {
go t.run()
}

return t, nil
}

// Duplicate Implements distributor.Tee which is used to tee distributor requests to pattern ingesters.
func (t *Tee) Duplicate(tenant string, streams []distributor.KeyedStream) {
for idx := range streams {
go func(stream distributor.KeyedStream) {
if err := t.sendStream(tenant, stream); err != nil {
func (t *Tee) run() {
for {
select {
case <-t.quit:
return
case req := <-t.requestCh:
ctx, cancel := context.WithTimeout(
user.InjectOrgID(context.Background(), req.tenant),
t.cfg.ClientConfig.RemoteTimeout,
)
defer cancel()

if err := t.sendStream(ctx, req.tenant, req.stream); err != nil {
level.Error(t.logger).Log("msg", "failed to send stream to pattern ingester", "err", err)
}
}(streams[idx])
}
}
}

func (t *Tee) sendStream(tenant string, stream distributor.KeyedStream) error {
err := t.sendOwnedStream(tenant, stream)
func (t *Tee) sendStream(ctx context.Context, tenant string, stream distributor.KeyedStream) error {
err := t.sendOwnedStream(ctx, tenant, stream)
if err == nil {
// Success, return early
return nil
}

// Pattern ingesters serve 2 functions, processing patterns and aggregating metrics.
// Only owned streams are processed for patterns, however any pattern ingester can
// aggregate metrics for any stream. Therefore, if we can't send the owned stream,
// try to send it to any pattern ingester so we at least capture the metrics.
// try to forward request to any pattern ingester so we at least capture the metrics.
replicationSet, err := t.ringClient.Ring().GetAllHealthy(ring.Read)
if replicationSet.Instances == nil {
return errors.New("no instances found")
Expand All @@ -86,11 +117,6 @@ func (t *Tee) sendStream(tenant string, stream distributor.KeyedStream) error {
},
}

ctx, cancel := context.WithTimeout(
user.InjectOrgID(context.Background(), tenant),
t.cfg.ClientConfig.RemoteTimeout,
)
defer cancel()
_, err = client.(logproto.PatternClient).Push(ctx, req)
if err != nil {
t.fallbackIngesterAppends.WithLabelValues(addr, "fail").Inc()
Expand All @@ -105,7 +131,7 @@ func (t *Tee) sendStream(tenant string, stream distributor.KeyedStream) error {
return err
}

func (t *Tee) sendOwnedStream(tenant string, stream distributor.KeyedStream) error {
func (t *Tee) sendOwnedStream(ctx context.Context, tenant string, stream distributor.KeyedStream) error {

Check warning on line 134 in pkg/pattern/tee.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

unused-parameter: parameter 'tenant' seems to be unused, consider removing or renaming it as _ (revive)
var descs [1]ring.InstanceDesc
replicationSet, err := t.ringClient.Ring().Get(stream.HashKey, ring.WriteNoExtend, descs[:0], nil, nil)
if err != nil {
Expand All @@ -125,8 +151,6 @@ func (t *Tee) sendOwnedStream(tenant string, stream distributor.KeyedStream) err
},
}

ctx, cancel := context.WithTimeout(user.InjectOrgID(context.Background(), tenant), t.cfg.ClientConfig.RemoteTimeout)
defer cancel()
_, err = client.(logproto.PatternClient).Push(ctx, req)
if err != nil {
t.ingesterAppends.WithLabelValues(addr, "fail").Inc()
Expand All @@ -135,3 +159,32 @@ func (t *Tee) sendOwnedStream(tenant string, stream distributor.KeyedStream) err
t.ingesterAppends.WithLabelValues(addr, "success").Inc()
return nil
}

// Duplicate Implements distributor.Tee which is used to tee distributor requests to pattern ingesters.
func (t *Tee) Duplicate(tenant string, streams []distributor.KeyedStream) {
for idx := range streams {
go func(stream distributor.KeyedStream) {
req := request{
tenant: tenant,
stream: stream,
}

// We need to prioritize protecting distributors to prevent bigger problems to the system, so
// we respond to backpressure by dropping requests if the channel is full
select {
case t.requestCh <- req:
t.teedRequests.WithLabelValues(tenant, "queued").Inc()
return
default:
t.teedRequests.WithLabelValues(tenant, "dropped").Inc()
return
}
}(streams[idx])
}
}

// Stop will cancel any ongoing requests and stop the goroutine listening for requests
func (t *Tee) Stop() {
close(t.quit)
t.requestCh = make(chan request, t.cfg.TeeBufferSize)
}

0 comments on commit 2f5485d

Please sign in to comment.