Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Block-builder: pull jobs from scheduler #10118

Merged
merged 79 commits into from
Jan 18, 2025
Merged
Changes from 1 commit
Commits
Show all changes
79 commits
Select commit Hold shift + click to select a range
f457910
Merge from davidgrant/block-builder-scheduler-reconcile
seizethedave Nov 18, 2024
8f9d996
Don't need to get fancy with critical section: no contention is possi…
seizethedave Nov 18, 2024
6712985
Document s.committed.
seizethedave Nov 18, 2024
f97e4cb
Merge
seizethedave Nov 24, 2024
4cfeed9
Fix merge slop.
seizethedave Nov 24, 2024
eaed668
Add proto defs.
seizethedave Nov 4, 2024
d37b1f2
Initial RPC protos.
seizethedave Nov 4, 2024
35d1868
Log an error if flushing fails.
seizethedave Nov 24, 2024
81dee72
Include RPC forms of assignJob/updateJob.
seizethedave Nov 4, 2024
3772a12
Merge remote-tracking branch 'origin/main' into davidgrant/block-buil…
seizethedave Nov 25, 2024
7c3f2da
Fleshing out the client module.
seizethedave Nov 25, 2024
db44d45
Appease the linter.
seizethedave Nov 25, 2024
5cc39ce
workerID param not used.
seizethedave Nov 25, 2024
3c6a566
CompleteJob: no context needed.
seizethedave Nov 25, 2024
91b8e83
The linter.
seizethedave Nov 25, 2024
7355de9
More logs.
seizethedave Nov 25, 2024
c34617d
Fix circular reference. Initialize client.
seizethedave Nov 29, 2024
9f67caa
schedulerproto -> schedulerpb
seizethedave Nov 29, 2024
47b5662
merge
seizethedave Nov 29, 2024
188217f
Fix compile stuff.
seizethedave Nov 30, 2024
179018e
Getting builder/scheduler up in docker-compose.
seizethedave Nov 30, 2024
f789447
Clean unneeded flags.
seizethedave Nov 30, 2024
bad7dc2
Fixes for BB<>scheduler communications:
seizethedave Nov 30, 2024
762012f
Fix block-builder's scheduler_config so it can connect to a scheduler.
seizethedave Nov 30, 2024
671f4b3
Shorter update interval for testing.
seizethedave Nov 30, 2024
f760fa8
Fix unloggable key.
seizethedave Nov 30, 2024
df7e43e
Longer job working time.
seizethedave Nov 30, 2024
6ad943c
Merge remote-tracking branch 'origin/main' into davidgrant/block-buil…
seizethedave Dec 2, 2024
63313b0
Undo blockbuilder.go hacking.
seizethedave Dec 2, 2024
509e9a8
Peel off block-builder changes not ready for this PR.
seizethedave Dec 2, 2024
fcdd240
Fix log message when creating a new job.
seizethedave Dec 2, 2024
77b00ac
Add some tests for scheduler client module. And fix some things found…
seizethedave Dec 2, 2024
6ed93cc
Linter.
seizethedave Dec 2, 2024
23e0db2
Lint: AGPL header.
seizethedave Dec 2, 2024
fa14a29
More info in the SchedulerClient comment.
seizethedave Dec 2, 2024
54f1af4
Remove superfluous comment.
seizethedave Dec 2, 2024
02d7856
Defer Kafka offset flushing to a later PR.
seizethedave Dec 2, 2024
b175c80
Revert "Undo blockbuilder.go hacking."
seizethedave Dec 3, 2024
b419dd7
Revert "Peel off block-builder changes not ready for this PR."
seizethedave Dec 3, 2024
cb250cd
Separate run functions for standalone, pull modes.
seizethedave Dec 3, 2024
5e53ac8
Working on a separate codepath for 'pull mode'.
seizethedave Dec 4, 2024
9944af4
don't need PartitionState.Clone.
seizethedave Dec 4, 2024
ecafbf5
Better startup initialization. Restore the assigned partition validat…
seizethedave Dec 4, 2024
5af6624
Lint.
seizethedave Dec 4, 2024
f17ee08
More changes to blockbuilder to support testing. Fix terminating loop…
seizethedave Dec 5, 2024
f980d09
Working on a pull-mode consumption test.
seizethedave Dec 5, 2024
815559f
merge
seizethedave Dec 19, 2024
9d238a2
Fix build.
seizethedave Dec 19, 2024
727f684
lint
seizethedave Dec 19, 2024
3b766a0
Add CycleEndTs, CycleEndOffset fields to JobSpec.
seizethedave Dec 20, 2024
d42f7bb
Use new JobSpec fields in consumeJob
seizethedave Dec 20, 2024
4281f8c
Fix pull-mode test.
seizethedave Dec 20, 2024
23024bc
Make proto fields consistent.
seizethedave Dec 20, 2024
f9fa09d
Re-fix test.
seizethedave Dec 20, 2024
47f30b9
Merge remote-tracking branch 'origin/main' into davidgrant/block-buil…
seizethedave Dec 20, 2024
3c873ef
Remove erroneous provenance headers.
seizethedave Dec 20, 2024
32aaa02
Pull-mode-ize another block-builder test.
seizethedave Dec 20, 2024
6e8c976
TestBlockBuilder_StartWithExistingCommit_PullMode
seizethedave Dec 20, 2024
cdadc8c
Add a jobIteration counter for tests.
seizethedave Dec 20, 2024
c85ce3a
More pull-mode tests.
seizethedave Dec 20, 2024
1f04656
Expose partition as a param to produceSamples
seizethedave Dec 21, 2024
904efbc
Merge remote-tracking branch 'origin/main' into davidgrant/block-buil…
seizethedave Dec 21, 2024
96b34c9
Add a second job to the general PullMode test.
seizethedave Dec 21, 2024
a2eab72
Merge remote-tracking branch 'origin/main' into davidgrant/block-buil…
seizethedave Jan 2, 2025
bf1f77d
Add job key details to log fields.
seizethedave Jan 2, 2025
3738b7f
Also make sure workerID isn't empty.
seizethedave Jan 2, 2025
a1e2945
remove 'failed' language from wrapped error string.
seizethedave Jan 2, 2025
6206183
Temporarily fix weird test flake.
seizethedave Jan 2, 2025
0062ff8
Cleanup of var names/comments.
seizethedave Jan 2, 2025
4e8dd7a
Validate completion keys.
seizethedave Jan 2, 2025
929ab6c
Merge remote-tracking branch 'origin/main' into davidgrant/block-buil…
seizethedave Jan 16, 2025
eed567b
Flush and close gRPC conn on shutdown.
seizethedave Jan 16, 2025
fd9f31d
mv blockBuilderPullModeConfig to blockbuilder_test.go.
seizethedave Jan 16, 2025
0f4a620
Do some assertions of scheduler call counts.
seizethedave Jan 16, 2025
ded6790
Less parallel tests.
seizethedave Jan 16, 2025
78f4bbb
Replace Flush scheme with a Close method that runs after the run-loop…
seizethedave Jan 17, 2025
eeeb3ac
Replace counts() with single-use count getters.
seizethedave Jan 17, 2025
baf97cf
Verify Close() called on shutdown.
seizethedave Jan 17, 2025
53a889c
Fix order of args to Equal().
seizethedave Jan 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Working on a separate codepath for 'pull mode'.
seizethedave committed Dec 4, 2024

Verified

This commit was signed with the committer’s verified signature.
miscco Michael Schellenberger Costa
commit 5e53ac8dd05971ce3867ad27cacf30ef84915121
126 changes: 89 additions & 37 deletions pkg/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
@@ -43,6 +43,7 @@ type BlockBuilder struct {
bucket objstore.Bucket

scheduler schedulerpb.SchedulerClient
committer stateCommitter

assignedPartitionIDs []int32
// fallbackOffsetMillis is the milliseconds timestamp after which a partition that doesn't have a commit will be consumed from.
@@ -75,7 +76,9 @@ func New(
}
b.bucket = bucketClient

runFunc := b.runningStandaloneMode
// The running func and committed impl are different based on the presence of a scheduler.
runningFunc := b.runningStandaloneMode
b.committer = &kafkaCommitter{}

if cfg.SchedulerConfig.Address != "" {
sched, err := b.makeSchedulerClient()
@@ -85,11 +88,11 @@ func New(
b.scheduler = sched

// If a scheduler is configured, we run in pull mode.
runFunc = b.runningPullMode
runningFunc = b.runningPullMode
b.committer = &noOpCommitter{}
}

b.Service = services.NewBasicService(b.starting, runFunc, b.stopping)

b.Service = services.NewBasicService(b.starting, runningFunc, b.stopping)
return b, nil
}

@@ -145,8 +148,58 @@ func (b *BlockBuilder) stopping(_ error) error {
return nil
}

// This is a service `running` function for standalone mode, where we consume
// from statically assigned partitions.
// runningPullMode is a service `running` function for pull mode, where we learn
// about jobs from a block-builder-scheduler.
func (b *BlockBuilder) runningPullMode(ctx context.Context) error {
// Kick off the scheduler's run loop with its own dependent subcontext.
sctx, cancel := context.WithCancel(ctx)
defer cancel()
go b.scheduler.Run(sctx)

for {
key, spec, err := b.scheduler.GetJob(ctx)
if err != nil {
if errors.Is(err, context.Canceled) {
return nil
}
level.Warn(b.logger).Log("msg", "failed to get job", "err", err)
continue
}

if _, err := b.consumeJob(ctx, key, spec); err != nil {
level.Error(b.logger).Log("msg", "failed to consume job", "job_id", key.Id, "epoch", key.Epoch, "err", err)
continue // ?
}

// TODO: CompleteJob needs to accept the new state returned from consumeJob.

if err := b.scheduler.CompleteJob(key); err != nil {
level.Error(b.logger).Log("msg", "failed to complete job", "job_id", key.Id, "epoch", key.Epoch, "err", err)
}
}
}

func (b *BlockBuilder) consumeJob(ctx context.Context, _ schedulerpb.JobKey, jobSpec schedulerpb.JobSpec) (PartitionState, error) {
state := PartitionState{
Commit: kadm.Offset{
Topic: jobSpec.Topic,
Partition: jobSpec.Partition,
At: jobSpec.StartOffset,
Metadata: "{}", // FIXME. This and other fields need to be plumbed in via the scheduler RPC layer.
},
CommitRecordTimestamp: jobSpec.CommitRecTs,
LastSeenOffset: jobSpec.LastSeenOffset,
LastBlockEnd: jobSpec.LastBlockEndTs,
}

cycleEndTime := jobSpec.LastBlockEndTs // ???
cycleEndOffset := jobSpec.EndOffset // ???

return b.consumePartition(ctx, jobSpec.Partition, state, cycleEndTime, cycleEndOffset)
}

// runningStandaloneMode is a service `running` function for standalone mode,
// where we consume from statically assigned partitions.
func (b *BlockBuilder) runningStandaloneMode(ctx context.Context) error {
// Do initial consumption on start using current time as the point up to which we are consuming.
// To avoid small blocks at startup, we consume until the <consume interval> boundary + buffer.
@@ -181,31 +234,6 @@ func (b *BlockBuilder) runningStandaloneMode(ctx context.Context) error {
}
}

// This is a service `running` function for pull mode, where we learn about
// jobs from a block-builder-scheduler.
func (b *BlockBuilder) runningPullMode(ctx context.Context) error {
sctx, cancel := context.WithCancel(ctx)
defer cancel()
go b.scheduler.Run(sctx)

for {
key, _, err := b.scheduler.GetJob(ctx)
if err != nil {
if errors.Is(err, context.Canceled) {
return nil
}
level.Warn(b.logger).Log("msg", "failed to get job", "err", err)
continue
}

time.Sleep(11 * time.Second)

if err := b.scheduler.CompleteJob(key); err != nil {
level.Warn(b.logger).Log("msg", "failed to complete job", "job_id", key.Id, "epoch", key.Epoch)
}
}
}

// cycleEndAtStartup returns the timestamp of the first cycleEnd relative to the start time t.
// One cycle is a duration of one interval plus extra time buffer.
func cycleEndAtStartup(t time.Time, interval, buffer time.Duration) time.Time {
@@ -268,7 +296,7 @@ func (b *BlockBuilder) nextConsumeCycle(ctx context.Context, cycleEndTime time.T
}

state := PartitionStateFromLag(b.logger, lag, b.fallbackOffsetMillis)
if err := b.consumePartition(ctx, partition, state, cycleEndTime, lag.End.Offset); err != nil {
if _, err := b.consumePartition(ctx, partition, state, cycleEndTime, lag.End.Offset); err != nil {
level.Error(b.logger).Log("msg", "failed to consume partition", "err", err, "partition", partition)
}
}
@@ -314,6 +342,15 @@ type PartitionState struct {
LastBlockEnd time.Time
}

func (s PartitionState) Clone() PartitionState {
return PartitionState{
Commit: s.Commit,
CommitRecordTimestamp: s.CommitRecordTimestamp,
LastSeenOffset: s.LastSeenOffset,
LastBlockEnd: s.LastBlockEnd,
}
}

func PartitionStateFromLag(logger log.Logger, lag kadm.GroupMemberLag, fallbackMillis int64) PartitionState {
commitRecTs, lastSeenOffset, lastBlockEndTs, err := unmarshallCommitMeta(lag.Commit.Metadata)
if err != nil {
@@ -353,7 +390,7 @@ func PartitionStateFromLag(logger log.Logger, lag kadm.GroupMemberLag, fallbackM

// consumePartition consumes records from the given partition until the cycleEnd timestamp.
// If the partition is lagging behind, it takes care of consuming it in sections.
func (b *BlockBuilder) consumePartition(ctx context.Context, partition int32, state PartitionState, cycleEndTime time.Time, cycleEndOffset int64) (err error) {
func (b *BlockBuilder) consumePartition(ctx context.Context, partition int32, state PartitionState, cycleEndTime time.Time, cycleEndOffset int64) (finalState PartitionState, err error) {
sp, ctx := spanlogger.NewWithLogger(ctx, b.logger, "BlockBuilder.consumePartition")
defer sp.Finish()

@@ -381,12 +418,12 @@ func (b *BlockBuilder) consumePartition(ctx context.Context, partition int32, st
logger := log.With(logger, "section_end", sectionEndTime, "offset", state.Commit.At)
state, err = b.consumePartitionSection(ctx, logger, builder, partition, state, sectionEndTime, cycleEndOffset)
if err != nil {
return fmt.Errorf("consume partition %d: %w", partition, err)
return PartitionState{}, fmt.Errorf("consume partition %d: %w", partition, err)
}
sectionEndTime = sectionEndTime.Add(b.cfg.ConsumeInterval)
}

return nil
return state, nil
}

func (b *BlockBuilder) consumePartitionSection(
@@ -555,14 +592,20 @@ consumerLoop:
LastSeenOffset: lastSeenOffset,
LastBlockEnd: lastBlockEnd,
}
if err := b.commitState(ctx, logger, b.cfg.ConsumerGroup, newState); err != nil {
if err := b.committer.commitState(ctx, b, logger, b.cfg.ConsumerGroup, newState); err != nil {
return state, err
}

return newState, nil
}

func (b *BlockBuilder) commitState(ctx context.Context, logger log.Logger, group string, state PartitionState) error {
type stateCommitter interface {
commitState(context.Context, *BlockBuilder, log.Logger, string, PartitionState) error
}

type kafkaCommitter struct{}

func (c *kafkaCommitter) commitState(ctx context.Context, b *BlockBuilder, logger log.Logger, group string, state PartitionState) error {
offsets := make(kadm.Offsets)
offsets.Add(state.Commit)

@@ -584,10 +627,19 @@ func (b *BlockBuilder) commitState(ctx context.Context, logger log.Logger, group
}

level.Info(logger).Log("msg", "successfully committed offset to kafka", "offset", state.Commit.At)
return nil
}

var _ stateCommitter = &kafkaCommitter{}

type noOpCommitter struct{}

func (c *noOpCommitter) commitState(ctx context.Context, b *BlockBuilder, logger log.Logger, _ string, state PartitionState) error {
return nil
}

var _ stateCommitter = &noOpCommitter{}

func (b *BlockBuilder) uploadBlocks(ctx context.Context, tenantID, dbDir string, blockIDs []string) error {
buc := bucket.NewUserBucketClient(tenantID, b.bucket, b.limits)
for _, bid := range blockIDs {