Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(blooms): Add task timining and sizing metrics #15032

Merged
merged 2 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions pkg/bloombuild/planner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type Metrics struct {
tenantsDiscovered prometheus.Counter
tenantTasksPlanned *prometheus.GaugeVec
tenantTasksCompleted *prometheus.GaugeVec
tenantTasksTiming *prometheus.HistogramVec

// Retention metrics
retentionRunning prometheus.Gauge
Expand Down Expand Up @@ -166,6 +167,14 @@ func NewMetrics(
Name: "tenant_tasks_completed",
Help: "Number of tasks completed for a tenant during the current build iteration.",
}, []string{"tenant", "status"}),
tenantTasksTiming: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricsNamespace,
chaudum marked this conversation as resolved.
Show resolved Hide resolved
Subsystem: metricsSubsystem,
Name: "tenant_tasks_time_seconds",
Help: "Time spent building tasks for a tenant during the current build iteration.",
// 1s --> 1h (steps of 1 minute)
Buckets: prometheus.LinearBuckets(1, 60, 60),
}, []string{"tenant", "status"}),

// Retention
retentionRunning: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Expand Down
30 changes: 18 additions & 12 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ type Planner struct {
tsdbStore common.TSDBStore
bloomStore bloomshipper.StoreBase

tasksQueue *queue.Queue
tasksQueue *queue.Queue
planFactory *strategies.Factory

metrics *Metrics
logger log.Logger
Expand Down Expand Up @@ -86,14 +87,15 @@ func New(
}

p := &Planner{
cfg: cfg,
limits: limits,
schemaCfg: schemaCfg,
tsdbStore: tsdbStore,
bloomStore: bloomStore,
tasksQueue: tasksQueue,
metrics: NewMetrics(r, tasksQueue.GetConnectedConsumersMetric),
logger: logger,
cfg: cfg,
limits: limits,
schemaCfg: schemaCfg,
tsdbStore: tsdbStore,
bloomStore: bloomStore,
tasksQueue: tasksQueue,
planFactory: strategies.NewFactory(limits, strategies.NewMetrics(r), logger),
metrics: NewMetrics(r, tasksQueue.GetConnectedConsumersMetric),
logger: logger,
}

p.retentionManager = NewRetentionManager(
Expand Down Expand Up @@ -370,7 +372,7 @@ func (p *Planner) computeTasks(
table config.DayTable,
tenant string,
) ([]*protos.Task, []bloomshipper.Meta, error) {
strategy, err := strategies.NewStrategy(tenant, p.limits, p.logger)
strategy, err := p.planFactory.GetStrategy(tenant)
if err != nil {
return nil, nil, fmt.Errorf("error creating strategy: %w", err)
}
Expand Down Expand Up @@ -770,8 +772,10 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
continue
}

startTime := time.Now()
result, err := p.forwardTaskToBuilder(builder, builderID, task)
if err != nil {
p.metrics.tenantTasksTiming.WithLabelValues(task.Tenant, statusFailure).Observe(time.Since(startTime).Seconds())
maxRetries := p.limits.BloomTaskMaxRetries(task.Tenant)
if maxRetries > 0 && int(task.timesEnqueued.Load()) >= maxRetries {
p.tasksQueue.Release(task.ProtoTask)
Expand Down Expand Up @@ -811,10 +815,12 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer

level.Debug(logger).Log(
"msg", "task completed",
"duration", time.Since(task.queueTime).Seconds(),
"timeSinceEnqueued", time.Since(task.queueTime).Seconds(),
"buildTime", time.Since(startTime).Seconds(),
"retries", task.timesEnqueued.Load()-1, // -1 because the first enqueue is not a retry
)
p.tasksQueue.Release(task.ProtoTask)
p.metrics.tenantTasksTiming.WithLabelValues(task.Tenant, statusSuccess).Observe(time.Since(startTime).Seconds())

// Send the result back to the task. The channel is buffered, so this should not block.
task.resultsChannel <- result
Expand Down Expand Up @@ -866,7 +872,7 @@ func (p *Planner) forwardTaskToBuilder(
case err := <-errCh:
return nil, err
case <-timeout:
return nil, fmt.Errorf("timeout waiting for response from builder (%s)", builderID)
return nil, fmt.Errorf("timeout (%s) waiting for response from builder (%s)", taskTimeout, builderID)
}
}

Expand Down
37 changes: 32 additions & 5 deletions pkg/bloombuild/planner/strategies/chunksize.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"github.com/dustin/go-humanize"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

Expand All @@ -21,22 +22,47 @@
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index"
)

const (
metricsNamespace = "loki"
metricsSubsystem = "bloomplanner"
)

type ChunkSizeStrategyMetrics struct {
tenantTaskSize *prometheus.HistogramVec
}

func NewChunkSizeStrategyMetrics(reg prometheus.Registerer) *ChunkSizeStrategyMetrics {

Check warning on line 34 in pkg/bloombuild/planner/strategies/chunksize.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

unused-parameter: parameter 'reg' seems to be unused, consider removing or renaming it as _ (revive)
return &ChunkSizeStrategyMetrics{
tenantTaskSize: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricsNamespace,
chaudum marked this conversation as resolved.
Show resolved Hide resolved
Subsystem: metricsSubsystem,
Name: "tenant_task_size_bytes",
Help: "Size of tasks generated by the chunk size strategy",
// 1GB --> 512GB
Buckets: prometheus.ExponentialBuckets(1e9, 2, 10),
}, []string{"tenant"}),
}
}

type ChunkSizeStrategyLimits interface {
BloomTaskTargetSeriesChunksSizeBytes(tenantID string) uint64
}

type ChunkSizeStrategy struct {
limits ChunkSizeStrategyLimits
logger log.Logger
limits ChunkSizeStrategyLimits
metrics *ChunkSizeStrategyMetrics
logger log.Logger
}

func NewChunkSizeStrategy(
limits ChunkSizeStrategyLimits,
metrics *ChunkSizeStrategyMetrics,
logger log.Logger,
) (*ChunkSizeStrategy, error) {
return &ChunkSizeStrategy{
limits: limits,
logger: logger,
limits: limits,
metrics: metrics,
logger: logger,
}, nil
}

Expand Down Expand Up @@ -82,8 +108,9 @@
continue
}

bounds := series.Bounds()
s.metrics.tenantTaskSize.WithLabelValues(tenant).Observe(float64(series.Size()))

bounds := series.Bounds()
blocks, err := getBlocksMatchingBounds(metas, bounds)
if err != nil {
return nil, fmt.Errorf("failed to get blocks matching bounds: %w", err)
Expand Down
3 changes: 2 additions & 1 deletion pkg/bloombuild/planner/strategies/chunksize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/bloombuild/planner/plannertest"
Expand Down Expand Up @@ -228,7 +229,7 @@ func Test_ChunkSizeStrategy_Plan(t *testing.T) {
logger := log.NewNopLogger()
//logger := log.NewLogfmtLogger(os.Stdout)

strategy, err := NewChunkSizeStrategy(tc.limits, logger)
strategy, err := NewChunkSizeStrategy(tc.limits, NewChunkSizeStrategyMetrics(prometheus.NewPedanticRegistry()), logger)
require.NoError(t, err)

actual, err := strategy.Plan(context.Background(), plannertest.TestTable, "fake", tc.tsdbs, tc.originalMetas)
Expand Down
37 changes: 31 additions & 6 deletions pkg/bloombuild/planner/strategies/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/loki/v3/pkg/bloombuild/common"
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
Expand Down Expand Up @@ -32,18 +33,42 @@ type PlanningStrategy interface {
Plan(ctx context.Context, table config.DayTable, tenant string, tsdbs TSDBSet, metas []bloomshipper.Meta) ([]*protos.Task, error)
}

func NewStrategy(
tenantID string,
type Metrics struct {
*ChunkSizeStrategyMetrics
}

func NewMetrics(reg prometheus.Registerer) *Metrics {
return &Metrics{
ChunkSizeStrategyMetrics: NewChunkSizeStrategyMetrics(reg),
}
}

type Factory struct {
limits Limits
logger log.Logger
metrics *Metrics
}

func NewFactory(
limits Limits,
metrics *Metrics,
logger log.Logger,
) (PlanningStrategy, error) {
strategy := limits.BloomPlanningStrategy(tenantID)
) *Factory {
return &Factory{
limits: limits,
logger: logger,
metrics: metrics,
}
}

func (f *Factory) GetStrategy(tenantID string) (PlanningStrategy, error) {
strategy := f.limits.BloomPlanningStrategy(tenantID)

switch strategy {
case SplitKeyspaceStrategyName:
return NewSplitKeyspaceStrategy(limits, logger)
return NewSplitKeyspaceStrategy(f.limits, f.logger)
case SplitBySeriesChunkSizeStrategyName:
return NewChunkSizeStrategy(limits, logger)
return NewChunkSizeStrategy(f.limits, f.metrics.ChunkSizeStrategyMetrics, f.logger)
default:
return nil, fmt.Errorf("unknown bloom planning strategy (%s)", strategy)
}
Expand Down
Loading