Skip to content

Commit

Permalink
refactor(blooms): Implement retry in builder (#13306)
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts authored Jun 26, 2024
1 parent 0c289a8 commit 8ebce00
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 29 deletions.
13 changes: 13 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,19 @@ bloom_build:
# CLI flag: -bloom-build.builder.planner-address
[planner_address: <string> | default = ""]

backoff_config:
# Minimum delay when backing off.
# CLI flag: -bloom-build.builder.backoff.backoff-min-period
[min_period: <duration> | default = 100ms]

# Maximum delay when backing off.
# CLI flag: -bloom-build.builder.backoff.backoff-max-period
[max_period: <duration> | default = 10s]

# Number of times to backoff and retry before failing.
# CLI flag: -bloom-build.builder.backoff.backoff-retries
[max_retries: <int> | default = 10]

# Experimental: The bloom_gateway block configures the Loki bloom gateway
# server, responsible for serving queries for filtering chunks based on filter
# expressions.
Expand Down
57 changes: 48 additions & 9 deletions pkg/bloombuild/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/google/uuid"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/user"
"github.com/pkg/errors"
Expand Down Expand Up @@ -110,12 +111,36 @@ func (b *Builder) stopping(_ error) error {
}

func (b *Builder) running(ctx context.Context) error {
// Retry if the connection to the planner is lost.
retries := backoff.New(ctx, b.cfg.BackoffConfig)
for retries.Ongoing() {
err := b.connectAndBuild(ctx)
if err == nil || errors.Is(err, context.Canceled) {
break
}

level.Error(b.logger).Log("msg", "failed to connect and build. Retrying", "err", err)
retries.Wait()
}

if err := retries.Err(); err != nil {
if errors.Is(err, context.Canceled) {
return nil
}
return fmt.Errorf("failed to connect and build: %w", err)
}

return nil
}

func (b *Builder) connectAndBuild(
ctx context.Context,
) error {
opts, err := b.cfg.GrpcConfig.DialOption(nil, nil)
if err != nil {
return fmt.Errorf("failed to create grpc dial options: %w", err)
}

// TODO: Wrap hereafter in retry logic
conn, err := grpc.DialContext(ctx, b.cfg.PlannerAddress, opts...)
if err != nil {
return fmt.Errorf("failed to dial bloom planner: %w", err)
Expand Down Expand Up @@ -150,8 +175,8 @@ func (b *Builder) builderLoop(c protos.PlannerForBuilder_BuilderLoopClient) erro
}

for b.State() == services.Running {
// When the planner connection closes or the builder stops, the context
// will be canceled and the loop will exit.
// When the planner connection closes, an EOF or "planner shutting down" error is returned.
// When the builder is shutting down, a gRPC context canceled error is returned.
protoTask, err := c.Recv()
if err != nil {
if status.Code(err) == codes.Canceled {
Expand All @@ -162,14 +187,16 @@ func (b *Builder) builderLoop(c protos.PlannerForBuilder_BuilderLoopClient) erro
return fmt.Errorf("failed to receive task from planner: %w", err)
}

logger := log.With(b.logger, "task", protoTask.Task.Id)

b.metrics.taskStarted.Inc()
start := time.Now()
status := statusSuccess

newMetas, err := b.processTask(c.Context(), protoTask.Task)
if err != nil {
status = statusFailure
level.Error(b.logger).Log("msg", "failed to process task", "err", err)
level.Error(logger).Log("msg", "failed to process task", "err", err)
}

b.metrics.taskCompleted.WithLabelValues(status).Inc()
Expand Down Expand Up @@ -197,13 +224,25 @@ func (b *Builder) notifyTaskCompletedToPlanner(
CreatedMetas: metas,
}

// TODO: Implement retry
if err := c.Send(&protos.BuilderToPlanner{
BuilderID: b.ID,
Result: *result.ToProtoTaskResult(),
}); err != nil {
// We have a retry mechanism upper in the stack, but we add another one here
// to try our best to avoid losing the task result.
retries := backoff.New(c.Context(), b.cfg.BackoffConfig)
for retries.Ongoing() {
if err := c.Send(&protos.BuilderToPlanner{
BuilderID: b.ID,
Result: *result.ToProtoTaskResult(),
}); err == nil {
break
}

level.Error(b.logger).Log("msg", "failed to acknowledge task completion to planner. Retrying", "err", err)
retries.Wait()
}

if err := retries.Err(); err != nil {
return fmt.Errorf("failed to acknowledge task completion to planner: %w", err)
}

return nil
}

Expand Down
95 changes: 75 additions & 20 deletions pkg/bloombuild/builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"context"
"fmt"
"net"
"sync"
"testing"
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -26,6 +28,7 @@ import (

func Test_BuilderLoop(t *testing.T) {
logger := log.NewNopLogger()
//logger := log.NewLogfmtLogger(os.Stdout)

schemaCfg := config.SchemaConfig{
Configs: []config.PeriodConfig{
Expand Down Expand Up @@ -69,9 +72,17 @@ func Test_BuilderLoop(t *testing.T) {
server, err := newFakePlannerServer(tasks)
require.NoError(t, err)

// Start the server so the builder can connect and receive tasks.
server.Start()

limits := fakeLimits{}
cfg := Config{
PlannerAddress: server.Addr(),
BackoffConfig: backoff.Config{
MinBackoff: 1 * time.Second,
MaxBackoff: 10 * time.Second,
MaxRetries: 5,
},
}
flagext.DefaultValues(&cfg.GrpcConfig)

Expand All @@ -87,10 +98,28 @@ func Test_BuilderLoop(t *testing.T) {
err = services.StartAndAwaitRunning(context.Background(), builder)
require.NoError(t, err)

// Wait for at least one task to be processed.
require.Eventually(t, func() bool {
return int(server.completedTasks.Load()) == len(tasks)
return server.CompletedTasks() > 0
}, 5*time.Second, 100*time.Millisecond)

// Right after stop it so connection is broken, and builder will retry.
server.Stop()

// While the server is stopped, the builder should keep retrying to connect but no tasks should be processed.
// Note this is just a way to sleep while making sure no tasks are processed.
tasksProcessedSoFar := server.CompletedTasks()
require.Never(t, func() bool {
return server.CompletedTasks() > tasksProcessedSoFar
}, 5*time.Second, 500*time.Millisecond)

// Now we start the server so the builder can connect and receive tasks.
server.Start()

require.Eventually(t, func() bool {
return server.CompletedTasks() >= len(tasks)
}, 30*time.Second, 500*time.Millisecond)

err = services.StopAndAwaitTerminated(context.Background(), builder)
require.NoError(t, err)

Expand All @@ -102,41 +131,62 @@ type fakePlannerServer struct {
completedTasks atomic.Int64
shutdownCalled bool

addr string
listenAddr string
grpcServer *grpc.Server
wg sync.WaitGroup
}

func newFakePlannerServer(tasks []*protos.ProtoTask) (*fakePlannerServer, error) {
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
return nil, err
}

server := &fakePlannerServer{
tasks: tasks,
addr: lis.Addr().String(),
grpcServer: grpc.NewServer(),
tasks: tasks,
}

protos.RegisterPlannerForBuilderServer(server.grpcServer, server)
go func() {
if err := server.grpcServer.Serve(lis); err != nil {
panic(err)
}
}()

return server, nil
}

func (f *fakePlannerServer) Addr() string {
return f.addr
if f.listenAddr == "" {
panic("server not started")
}
return f.listenAddr
}

func (f *fakePlannerServer) Stop() {
f.grpcServer.Stop()
if f.grpcServer != nil {
f.grpcServer.Stop()
}

f.wg.Wait()
}

func (f *fakePlannerServer) Start() {
f.Stop()

lisAddr := "localhost:0"
if f.listenAddr != "" {
// Reuse the same address if the server was stopped and started again.
lisAddr = f.listenAddr
}

lis, err := net.Listen("tcp", lisAddr)
if err != nil {
panic(err)
}
f.listenAddr = lis.Addr().String()

f.grpcServer = grpc.NewServer()
protos.RegisterPlannerForBuilderServer(f.grpcServer, f)
go func() {
if err := f.grpcServer.Serve(lis); err != nil {
panic(err)
}
}()
}

func (f *fakePlannerServer) BuilderLoop(srv protos.PlannerForBuilder_BuilderLoopServer) error {
f.wg.Add(1)
defer f.wg.Done()

// Receive Ready
if _, err := srv.Recv(); err != nil {
return fmt.Errorf("failed to receive ready: %w", err)
Expand All @@ -149,14 +199,19 @@ func (f *fakePlannerServer) BuilderLoop(srv protos.PlannerForBuilder_BuilderLoop
if _, err := srv.Recv(); err != nil {
return fmt.Errorf("failed to receive task response: %w", err)
}
f.completedTasks.Add(1)
time.Sleep(10 * time.Millisecond) // Simulate task processing time to add some latency.
f.completedTasks.Inc()
}

// No more tasks. Wait until shutdown.
<-srv.Context().Done()
return nil
}

func (f *fakePlannerServer) CompletedTasks() int {
return int(f.completedTasks.Load())
}

func (f *fakePlannerServer) NotifyBuilderShutdown(_ context.Context, _ *protos.NotifyBuilderShutdownRequest) (*protos.NotifyBuilderShutdownResponse, error) {
f.shutdownCalled = true
return &protos.NotifyBuilderShutdownResponse{}, nil
Expand Down
3 changes: 3 additions & 0 deletions pkg/bloombuild/builder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,22 @@ import (
"flag"
"fmt"

"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/grpcclient"
)

// Config configures the bloom-builder component.
type Config struct {
GrpcConfig grpcclient.Config `yaml:"grpc_config"`
PlannerAddress string `yaml:"planner_address"`
BackoffConfig backoff.Config `yaml:"backoff_config"`
}

// RegisterFlagsWithPrefix registers flags for the bloom-planner configuration.
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.PlannerAddress, prefix+".planner-address", "", "Hostname (and port) of the bloom planner")
cfg.GrpcConfig.RegisterFlagsWithPrefix(prefix+".grpc", f)
cfg.BackoffConfig.RegisterFlagsWithPrefix(prefix+".backoff", f)
}

func (cfg *Config) Validate() error {
Expand Down

0 comments on commit 8ebce00

Please sign in to comment.