Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Block-builder: pull jobs from scheduler #10118

Merged
merged 79 commits into from
Jan 18, 2025
Merged
Show file tree
Hide file tree
Changes from 70 commits
Commits
Show all changes
79 commits
Select commit Hold shift + click to select a range
f457910
Merge from davidgrant/block-builder-scheduler-reconcile
seizethedave Nov 18, 2024
8f9d996
Don't need to get fancy with critical section: no contention is possi…
seizethedave Nov 18, 2024
6712985
Document s.committed.
seizethedave Nov 18, 2024
f97e4cb
Merge
seizethedave Nov 24, 2024
4cfeed9
Fix merge slop.
seizethedave Nov 24, 2024
eaed668
Add proto defs.
seizethedave Nov 4, 2024
d37b1f2
Initial RPC protos.
seizethedave Nov 4, 2024
35d1868
Log an error if flushing fails.
seizethedave Nov 24, 2024
81dee72
Include RPC forms of assignJob/updateJob.
seizethedave Nov 4, 2024
3772a12
Merge remote-tracking branch 'origin/main' into davidgrant/block-buil…
seizethedave Nov 25, 2024
7c3f2da
Fleshing out the client module.
seizethedave Nov 25, 2024
db44d45
Appease the linter.
seizethedave Nov 25, 2024
5cc39ce
workerID param not used.
seizethedave Nov 25, 2024
3c6a566
CompleteJob: no context needed.
seizethedave Nov 25, 2024
91b8e83
The linter.
seizethedave Nov 25, 2024
7355de9
More logs.
seizethedave Nov 25, 2024
c34617d
Fix circular reference. Initialize client.
seizethedave Nov 29, 2024
9f67caa
schedulerproto -> schedulerpb
seizethedave Nov 29, 2024
47b5662
merge
seizethedave Nov 29, 2024
188217f
Fix compile stuff.
seizethedave Nov 30, 2024
179018e
Getting builder/scheduler up in docker-compose.
seizethedave Nov 30, 2024
f789447
Clean unneeded flags.
seizethedave Nov 30, 2024
bad7dc2
Fixes for BB<>scheduler communications:
seizethedave Nov 30, 2024
762012f
Fix block-builder's scheduler_config so it can connect to a scheduler.
seizethedave Nov 30, 2024
671f4b3
Shorter update interval for testing.
seizethedave Nov 30, 2024
f760fa8
Fix unloggable key.
seizethedave Nov 30, 2024
df7e43e
Longer job working time.
seizethedave Nov 30, 2024
6ad943c
Merge remote-tracking branch 'origin/main' into davidgrant/block-buil…
seizethedave Dec 2, 2024
63313b0
Undo blockbuilder.go hacking.
seizethedave Dec 2, 2024
509e9a8
Peel off block-builder changes not ready for this PR.
seizethedave Dec 2, 2024
fcdd240
Fix log message when creating a new job.
seizethedave Dec 2, 2024
77b00ac
Add some tests for scheduler client module. And fix some things found…
seizethedave Dec 2, 2024
6ed93cc
Linter.
seizethedave Dec 2, 2024
23e0db2
Lint: AGPL header.
seizethedave Dec 2, 2024
fa14a29
More info in the SchedulerClient comment.
seizethedave Dec 2, 2024
54f1af4
Remove superfluous comment.
seizethedave Dec 2, 2024
02d7856
Defer Kafka offset flushing to a later PR.
seizethedave Dec 2, 2024
b175c80
Revert "Undo blockbuilder.go hacking."
seizethedave Dec 3, 2024
b419dd7
Revert "Peel off block-builder changes not ready for this PR."
seizethedave Dec 3, 2024
cb250cd
Separate run functions for standalone, pull modes.
seizethedave Dec 3, 2024
5e53ac8
Working on a separate codepath for 'pull mode'.
seizethedave Dec 4, 2024
9944af4
don't need PartitionState.Clone.
seizethedave Dec 4, 2024
ecafbf5
Better startup initialization. Restore the assigned partition validat…
seizethedave Dec 4, 2024
5af6624
Lint.
seizethedave Dec 4, 2024
f17ee08
More changes to blockbuilder to support testing. Fix terminating loop…
seizethedave Dec 5, 2024
f980d09
Working on a pull-mode consumption test.
seizethedave Dec 5, 2024
815559f
merge
seizethedave Dec 19, 2024
9d238a2
Fix build.
seizethedave Dec 19, 2024
727f684
lint
seizethedave Dec 19, 2024
3b766a0
Add CycleEndTs, CycleEndOffset fields to JobSpec.
seizethedave Dec 20, 2024
d42f7bb
Use new JobSpec fields in consumeJob
seizethedave Dec 20, 2024
4281f8c
Fix pull-mode test.
seizethedave Dec 20, 2024
23024bc
Make proto fields consistent.
seizethedave Dec 20, 2024
f9fa09d
Re-fix test.
seizethedave Dec 20, 2024
47f30b9
Merge remote-tracking branch 'origin/main' into davidgrant/block-buil…
seizethedave Dec 20, 2024
3c873ef
Remove erroneous provenance headers.
seizethedave Dec 20, 2024
32aaa02
Pull-mode-ize another block-builder test.
seizethedave Dec 20, 2024
6e8c976
TestBlockBuilder_StartWithExistingCommit_PullMode
seizethedave Dec 20, 2024
cdadc8c
Add a jobIteration counter for tests.
seizethedave Dec 20, 2024
c85ce3a
More pull-mode tests.
seizethedave Dec 20, 2024
1f04656
Expose partition as a param to produceSamples
seizethedave Dec 21, 2024
904efbc
Merge remote-tracking branch 'origin/main' into davidgrant/block-buil…
seizethedave Dec 21, 2024
96b34c9
Add a second job to the general PullMode test.
seizethedave Dec 21, 2024
a2eab72
Merge remote-tracking branch 'origin/main' into davidgrant/block-buil…
seizethedave Jan 2, 2025
bf1f77d
Add job key details to log fields.
seizethedave Jan 2, 2025
3738b7f
Also make sure workerID isn't empty.
seizethedave Jan 2, 2025
a1e2945
remove 'failed' language from wrapped error string.
seizethedave Jan 2, 2025
6206183
Temporarily fix weird test flake.
seizethedave Jan 2, 2025
0062ff8
Cleanup of var names/comments.
seizethedave Jan 2, 2025
4e8dd7a
Validate completion keys.
seizethedave Jan 2, 2025
929ab6c
Merge remote-tracking branch 'origin/main' into davidgrant/block-buil…
seizethedave Jan 16, 2025
eed567b
Flush and close gRPC conn on shutdown.
seizethedave Jan 16, 2025
fd9f31d
mv blockBuilderPullModeConfig to blockbuilder_test.go.
seizethedave Jan 16, 2025
0f4a620
Do some assertions of scheduler call counts.
seizethedave Jan 16, 2025
ded6790
Less parallel tests.
seizethedave Jan 16, 2025
78f4bbb
Replace Flush scheme with a Close method that runs after the run-loop…
seizethedave Jan 17, 2025
eeeb3ac
Replace counts() with single-use count getters.
seizethedave Jan 17, 2025
baf97cf
Verify Close() called on shutdown.
seizethedave Jan 17, 2025
53a889c
Fix order of args to Equal().
seizethedave Jan 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions development/mimir-ingest-storage/config/mimir.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ ingester:
min_partition_owners_duration: 10s
delete_inactive_partition_after: 1m

block_builder:
scheduler_config:
address: mimir-block-builder-scheduler-1:9095
update_interval: 5s
max_update_age: 30m

blocks_storage:
s3:
bucket_name: mimir-blocks
Expand Down
169 changes: 152 additions & 17 deletions pkg/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@ import (
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/runutil"
"github.com/grafana/dskit/services"
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/objstore"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
"go.uber.org/atomic"
"google.golang.org/grpc"

"github.com/grafana/mimir/pkg/blockbuilder/schedulerpb"
"github.com/grafana/mimir/pkg/storage/bucket"
"github.com/grafana/mimir/pkg/storage/ingest"
mimir_tsdb "github.com/grafana/mimir/pkg/storage/tsdb"
Expand All @@ -37,6 +42,11 @@ type BlockBuilder struct {
limits *validation.Overrides
kafkaClient *kgo.Client
bucket objstore.Bucket
scheduler schedulerpb.SchedulerClient
committer stateCommitter

// the current job iteration number. For tests.
jobIteration atomic.Int64

assignedPartitionIDs []int32
// fallbackOffsetMillis is the milliseconds timestamp after which a partition that doesn't have a commit will be consumed from.
Expand All @@ -51,6 +61,18 @@ func New(
logger log.Logger,
reg prometheus.Registerer,
limits *validation.Overrides,
) (*BlockBuilder, error) {
return newWithSchedulerClient(cfg, logger, reg, limits, nil)
}

// newWithSchedulerClient creates a new BlockBuilder with a scheduler client.
// This is exposed for testing purposes. You should probably be using New().
func newWithSchedulerClient(
cfg Config,
logger log.Logger,
reg prometheus.Registerer,
limits *validation.Overrides,
schedulerClient schedulerpb.SchedulerClient,
) (*BlockBuilder, error) {
b := &BlockBuilder{
cfg: cfg,
Expand All @@ -61,23 +83,67 @@ func New(
tsdbBuilderMetrics: newTSDBBBuilderMetrics(reg),
}

b.assignedPartitionIDs = b.cfg.PartitionAssignment[b.cfg.InstanceID]
if len(b.assignedPartitionIDs) == 0 {
// This is just an assertion check. The config validation prevents this from happening.
return nil, fmt.Errorf("no partitions assigned to instance %s", b.cfg.InstanceID)
}

bucketClient, err := bucket.NewClient(context.Background(), cfg.BlocksStorage.Bucket, "block-builder", logger, reg)
if err != nil {
return nil, fmt.Errorf("failed to create the bucket client: %w", err)
}
b.bucket = bucketClient

b.Service = services.NewBasicService(b.starting, b.running, b.stopping)
var runningFunc services.RunningFn

if cfg.SchedulerConfig.Address != "" {
// Pull mode: we learn about jobs from a block-builder-scheduler.

if schedulerClient != nil {
b.scheduler = schedulerClient
} else {
var err error
if b.scheduler, err = b.makeSchedulerClient(); err != nil {
return nil, fmt.Errorf("make scheduler client: %w", err)
}
}

runningFunc = b.runningPullMode
b.committer = &noOpCommitter{}
} else {
// Standalone mode: we consume from statically assigned partitions.
b.assignedPartitionIDs = b.cfg.PartitionAssignment[b.cfg.InstanceID]
if len(b.assignedPartitionIDs) == 0 {
// This is just an assertion check. The config validation prevents this from happening.
return nil, fmt.Errorf("no partitions assigned to instance %s", b.cfg.InstanceID)
}

runningFunc = b.runningStandaloneMode
b.committer = &kafkaCommitter{}
}

b.Service = services.NewBasicService(b.starting, runningFunc, b.stopping)
return b, nil
}

func (b *BlockBuilder) makeSchedulerClient() (schedulerpb.SchedulerClient, error) {
dialOpts, err := b.cfg.SchedulerConfig.GRPCClientConfig.DialOption(
[]grpc.UnaryClientInterceptor{otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer())},
nil)
if err != nil {
return nil, err
}

// nolint:staticcheck // grpc.Dial() has been deprecated; we'll address it before upgrading to gRPC 2.
conn, err := grpc.Dial(b.cfg.SchedulerConfig.Address, dialOpts...)
if err != nil {
return nil, err
}
seizethedave marked this conversation as resolved.
Show resolved Hide resolved

return schedulerpb.NewSchedulerClient(
b.cfg.InstanceID,
schedulerpb.NewBlockBuilderSchedulerClient(conn),
b.logger,
b.cfg.SchedulerConfig.UpdateInterval,
b.cfg.SchedulerConfig.MaxUpdateAge,
)
}

func (b *BlockBuilder) starting(context.Context) (err error) {
// Empty any previous artifacts.
if err := os.RemoveAll(b.cfg.DataDir); err != nil {
Expand All @@ -104,11 +170,65 @@ func (b *BlockBuilder) starting(context.Context) (err error) {

func (b *BlockBuilder) stopping(_ error) error {
b.kafkaClient.Close()

return nil
}

func (b *BlockBuilder) running(ctx context.Context) error {
// runningPullMode is a service `running` function for pull mode, where we learn
// about jobs from a block-builder-scheduler. We consume one job at a time.
func (b *BlockBuilder) runningPullMode(ctx context.Context) error {
// Kick off the scheduler's run loop.
go b.scheduler.Run(ctx)

for {
if err := ctx.Err(); err != nil {
if errors.Is(err, context.Canceled) {
return nil
}
return err
}

key, spec, err := b.scheduler.GetJob(ctx)
if err != nil {
if errors.Is(err, context.Canceled) {
return nil
}
level.Error(b.logger).Log("msg", "failed to get job", "err", err)
continue
}

if _, err := b.consumeJob(ctx, key, spec); err != nil {
level.Error(b.logger).Log("msg", "failed to consume job", "job_id", key.Id, "epoch", key.Epoch, "err", err)
continue
}
Comment on lines +212 to +215
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to tell back to scheduler that the job failed. But I guess scheduler will know when a job as not received an update for some (short) time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think I recall discussing this in a design document. Scheduler will know when it has failed as it won't receive an update within X seconds. We can always enhance this by adding failure info to the UpdateJob RPC. Initially I'm just keeping it barebones.


if err := b.scheduler.CompleteJob(key); err != nil {
level.Error(b.logger).Log("msg", "failed to complete job", "job_id", key.Id, "epoch", key.Epoch, "err", err)
}

b.jobIteration.Inc()
}
}

// consumeJob performs block consumption from Kafka into object storage based on the given job spec.
func (b *BlockBuilder) consumeJob(ctx context.Context, key schedulerpb.JobKey, spec schedulerpb.JobSpec) (PartitionState, error) {
state := PartitionState{
Commit: kadm.Offset{
Topic: spec.Topic,
Partition: spec.Partition,
At: spec.StartOffset,
},
CommitRecordTimestamp: spec.CommitRecTs,
LastSeenOffset: spec.LastSeenOffset,
LastBlockEnd: spec.LastBlockEndTs,
}

logger := log.With(b.logger, "job_id", key.Id, "job_epoch", key.Epoch)
return b.consumePartition(ctx, spec.Partition, state, spec.CycleEndTs, spec.CycleEndOffset, logger)
}

// runningStandaloneMode is a service `running` function for standalone mode,
// where we consume from statically assigned partitions.
func (b *BlockBuilder) runningStandaloneMode(ctx context.Context) error {
// Do initial consumption on start using current time as the point up to which we are consuming.
// To avoid small blocks at startup, we consume until the <consume interval> boundary + buffer.
cycleEndTime := cycleEndAtStartup(time.Now(), b.cfg.ConsumeInterval, b.cfg.ConsumeIntervalBuffer)
Expand Down Expand Up @@ -204,7 +324,7 @@ func (b *BlockBuilder) nextConsumeCycle(ctx context.Context, cycleEndTime time.T
}

state := PartitionStateFromLag(b.logger, lag, b.fallbackOffsetMillis)
if err := b.consumePartition(ctx, partition, state, cycleEndTime, lag.End.Offset); err != nil {
if _, err := b.consumePartition(ctx, partition, state, cycleEndTime, lag.End.Offset, b.logger); err != nil {
level.Error(b.logger).Log("msg", "failed to consume partition", "err", err, "partition", partition)
}
}
Expand Down Expand Up @@ -289,11 +409,11 @@ func PartitionStateFromLag(logger log.Logger, lag kadm.GroupMemberLag, fallbackM

// consumePartition consumes records from the given partition until the cycleEnd timestamp.
// If the partition is lagging behind, it takes care of consuming it in sections.
func (b *BlockBuilder) consumePartition(ctx context.Context, partition int32, state PartitionState, cycleEndTime time.Time, cycleEndOffset int64) (err error) {
sp, ctx := spanlogger.NewWithLogger(ctx, b.logger, "BlockBuilder.consumePartition")
func (b *BlockBuilder) consumePartition(ctx context.Context, partition int32, state PartitionState, cycleEndTime time.Time, cycleEndOffset int64, logger log.Logger) (finalState PartitionState, err error) {
sp, ctx := spanlogger.NewWithLogger(ctx, logger, "BlockBuilder.consumePartition")
defer sp.Finish()

logger := log.With(sp, "partition", partition, "cycle_end", cycleEndTime, "cycle_end_offset", cycleEndOffset)
logger = log.With(sp, "partition", partition, "cycle_end", cycleEndTime, "cycle_end_offset", cycleEndOffset)

builder := NewTSDBBuilder(b.logger, b.cfg.DataDir, b.cfg.BlocksStorage, b.limits, b.tsdbBuilderMetrics, b.cfg.ApplyMaxGlobalSeriesPerUserBelow)
defer runutil.CloseWithErrCapture(&err, builder, "closing tsdb builder")
Expand All @@ -317,12 +437,12 @@ func (b *BlockBuilder) consumePartition(ctx context.Context, partition int32, st
logger := log.With(logger, "section_end", sectionEndTime, "offset", state.Commit.At)
state, err = b.consumePartitionSection(ctx, logger, builder, partition, state, sectionEndTime, cycleEndOffset)
if err != nil {
return fmt.Errorf("consume partition %d: %w", partition, err)
return PartitionState{}, fmt.Errorf("consume partition %d: %w", partition, err)
}
sectionEndTime = sectionEndTime.Add(b.cfg.ConsumeInterval)
}

return nil
return state, nil
}

func (b *BlockBuilder) consumePartitionSection(
Expand Down Expand Up @@ -491,14 +611,20 @@ consumerLoop:
LastSeenOffset: lastSeenOffset,
LastBlockEnd: lastBlockEnd,
}
if err := b.commitState(ctx, logger, b.cfg.ConsumerGroup, newState); err != nil {
if err := b.committer.commitState(ctx, b, logger, b.cfg.ConsumerGroup, newState); err != nil {
return state, err
}

return newState, nil
}

func (b *BlockBuilder) commitState(ctx context.Context, logger log.Logger, group string, state PartitionState) error {
type stateCommitter interface {
commitState(context.Context, *BlockBuilder, log.Logger, string, PartitionState) error
}

type kafkaCommitter struct{}

func (c *kafkaCommitter) commitState(ctx context.Context, b *BlockBuilder, logger log.Logger, group string, state PartitionState) error {
offsets := make(kadm.Offsets)
offsets.Add(state.Commit)

Expand All @@ -520,10 +646,19 @@ func (b *BlockBuilder) commitState(ctx context.Context, logger log.Logger, group
}

level.Info(logger).Log("msg", "successfully committed offset to kafka", "offset", state.Commit.At)
return nil
}

var _ stateCommitter = &kafkaCommitter{}

type noOpCommitter struct{}

func (c *noOpCommitter) commitState(_ context.Context, _ *BlockBuilder, _ log.Logger, _ string, _ PartitionState) error {
return nil
}

var _ stateCommitter = &noOpCommitter{}

func (b *BlockBuilder) uploadBlocks(ctx context.Context, tenantID, dbDir string, blockIDs []string) error {
buc := bucket.NewUserBucketClient(tenantID, b.bucket, b.limits)
for _, bid := range blockIDs {
Expand Down
Loading
Loading