diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 89d461541845..a3966db2f9af 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -377,6 +377,19 @@ bloom_build: # CLI flag: -bloom-build.builder.planner-address [planner_address: | default = ""] + backoff_config: + # Minimum delay when backing off. + # CLI flag: -bloom-build.builder.backoff.backoff-min-period + [min_period: | default = 100ms] + + # Maximum delay when backing off. + # CLI flag: -bloom-build.builder.backoff.backoff-max-period + [max_period: | default = 10s] + + # Number of times to backoff and retry before failing. + # CLI flag: -bloom-build.builder.backoff.backoff-retries + [max_retries: | default = 10] + # Experimental: The bloom_gateway block configures the Loki bloom gateway # server, responsible for serving queries for filtering chunks based on filter # expressions. diff --git a/pkg/bloombuild/builder/builder.go b/pkg/bloombuild/builder/builder.go index 3a5638ab4665..f05c1fc08fc3 100644 --- a/pkg/bloombuild/builder/builder.go +++ b/pkg/bloombuild/builder/builder.go @@ -10,6 +10,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/google/uuid" + "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/services" "github.com/grafana/dskit/user" "github.com/pkg/errors" @@ -110,12 +111,36 @@ func (b *Builder) stopping(_ error) error { } func (b *Builder) running(ctx context.Context) error { + // Retry if the connection to the planner is lost. + retries := backoff.New(ctx, b.cfg.BackoffConfig) + for retries.Ongoing() { + err := b.connectAndBuild(ctx) + if err == nil || errors.Is(err, context.Canceled) { + break + } + + level.Error(b.logger).Log("msg", "failed to connect and build. Retrying", "err", err) + retries.Wait() + } + + if err := retries.Err(); err != nil { + if errors.Is(err, context.Canceled) { + return nil + } + return fmt.Errorf("failed to connect and build: %w", err) + } + + return nil +} + +func (b *Builder) connectAndBuild( + ctx context.Context, +) error { opts, err := b.cfg.GrpcConfig.DialOption(nil, nil) if err != nil { return fmt.Errorf("failed to create grpc dial options: %w", err) } - // TODO: Wrap hereafter in retry logic conn, err := grpc.DialContext(ctx, b.cfg.PlannerAddress, opts...) if err != nil { return fmt.Errorf("failed to dial bloom planner: %w", err) @@ -150,8 +175,8 @@ func (b *Builder) builderLoop(c protos.PlannerForBuilder_BuilderLoopClient) erro } for b.State() == services.Running { - // When the planner connection closes or the builder stops, the context - // will be canceled and the loop will exit. + // When the planner connection closes, an EOF or "planner shutting down" error is returned. + // When the builder is shutting down, a gRPC context canceled error is returned. protoTask, err := c.Recv() if err != nil { if status.Code(err) == codes.Canceled { @@ -162,6 +187,8 @@ func (b *Builder) builderLoop(c protos.PlannerForBuilder_BuilderLoopClient) erro return fmt.Errorf("failed to receive task from planner: %w", err) } + logger := log.With(b.logger, "task", protoTask.Task.Id) + b.metrics.taskStarted.Inc() start := time.Now() status := statusSuccess @@ -169,7 +196,7 @@ func (b *Builder) builderLoop(c protos.PlannerForBuilder_BuilderLoopClient) erro newMetas, err := b.processTask(c.Context(), protoTask.Task) if err != nil { status = statusFailure - level.Error(b.logger).Log("msg", "failed to process task", "err", err) + level.Error(logger).Log("msg", "failed to process task", "err", err) } b.metrics.taskCompleted.WithLabelValues(status).Inc() @@ -197,13 +224,25 @@ func (b *Builder) notifyTaskCompletedToPlanner( CreatedMetas: metas, } - // TODO: Implement retry - if err := c.Send(&protos.BuilderToPlanner{ - BuilderID: b.ID, - Result: *result.ToProtoTaskResult(), - }); err != nil { + // We have a retry mechanism upper in the stack, but we add another one here + // to try our best to avoid losing the task result. + retries := backoff.New(c.Context(), b.cfg.BackoffConfig) + for retries.Ongoing() { + if err := c.Send(&protos.BuilderToPlanner{ + BuilderID: b.ID, + Result: *result.ToProtoTaskResult(), + }); err == nil { + break + } + + level.Error(b.logger).Log("msg", "failed to acknowledge task completion to planner. Retrying", "err", err) + retries.Wait() + } + + if err := retries.Err(); err != nil { return fmt.Errorf("failed to acknowledge task completion to planner: %w", err) } + return nil } diff --git a/pkg/bloombuild/builder/builder_test.go b/pkg/bloombuild/builder/builder_test.go index 149e43f3234d..764e8cb6350f 100644 --- a/pkg/bloombuild/builder/builder_test.go +++ b/pkg/bloombuild/builder/builder_test.go @@ -4,10 +4,12 @@ import ( "context" "fmt" "net" + "sync" "testing" "time" "github.com/go-kit/log" + "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/flagext" "github.com/grafana/dskit/services" "github.com/prometheus/client_golang/prometheus" @@ -26,6 +28,7 @@ import ( func Test_BuilderLoop(t *testing.T) { logger := log.NewNopLogger() + //logger := log.NewLogfmtLogger(os.Stdout) schemaCfg := config.SchemaConfig{ Configs: []config.PeriodConfig{ @@ -69,9 +72,17 @@ func Test_BuilderLoop(t *testing.T) { server, err := newFakePlannerServer(tasks) require.NoError(t, err) + // Start the server so the builder can connect and receive tasks. + server.Start() + limits := fakeLimits{} cfg := Config{ PlannerAddress: server.Addr(), + BackoffConfig: backoff.Config{ + MinBackoff: 1 * time.Second, + MaxBackoff: 10 * time.Second, + MaxRetries: 5, + }, } flagext.DefaultValues(&cfg.GrpcConfig) @@ -87,10 +98,28 @@ func Test_BuilderLoop(t *testing.T) { err = services.StartAndAwaitRunning(context.Background(), builder) require.NoError(t, err) + // Wait for at least one task to be processed. require.Eventually(t, func() bool { - return int(server.completedTasks.Load()) == len(tasks) + return server.CompletedTasks() > 0 }, 5*time.Second, 100*time.Millisecond) + // Right after stop it so connection is broken, and builder will retry. + server.Stop() + + // While the server is stopped, the builder should keep retrying to connect but no tasks should be processed. + // Note this is just a way to sleep while making sure no tasks are processed. + tasksProcessedSoFar := server.CompletedTasks() + require.Never(t, func() bool { + return server.CompletedTasks() > tasksProcessedSoFar + }, 5*time.Second, 500*time.Millisecond) + + // Now we start the server so the builder can connect and receive tasks. + server.Start() + + require.Eventually(t, func() bool { + return server.CompletedTasks() >= len(tasks) + }, 30*time.Second, 500*time.Millisecond) + err = services.StopAndAwaitTerminated(context.Background(), builder) require.NoError(t, err) @@ -102,41 +131,62 @@ type fakePlannerServer struct { completedTasks atomic.Int64 shutdownCalled bool - addr string + listenAddr string grpcServer *grpc.Server + wg sync.WaitGroup } func newFakePlannerServer(tasks []*protos.ProtoTask) (*fakePlannerServer, error) { - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - return nil, err - } - server := &fakePlannerServer{ - tasks: tasks, - addr: lis.Addr().String(), - grpcServer: grpc.NewServer(), + tasks: tasks, } - protos.RegisterPlannerForBuilderServer(server.grpcServer, server) - go func() { - if err := server.grpcServer.Serve(lis); err != nil { - panic(err) - } - }() - return server, nil } func (f *fakePlannerServer) Addr() string { - return f.addr + if f.listenAddr == "" { + panic("server not started") + } + return f.listenAddr } func (f *fakePlannerServer) Stop() { - f.grpcServer.Stop() + if f.grpcServer != nil { + f.grpcServer.Stop() + } + + f.wg.Wait() +} + +func (f *fakePlannerServer) Start() { + f.Stop() + + lisAddr := "localhost:0" + if f.listenAddr != "" { + // Reuse the same address if the server was stopped and started again. + lisAddr = f.listenAddr + } + + lis, err := net.Listen("tcp", lisAddr) + if err != nil { + panic(err) + } + f.listenAddr = lis.Addr().String() + + f.grpcServer = grpc.NewServer() + protos.RegisterPlannerForBuilderServer(f.grpcServer, f) + go func() { + if err := f.grpcServer.Serve(lis); err != nil { + panic(err) + } + }() } func (f *fakePlannerServer) BuilderLoop(srv protos.PlannerForBuilder_BuilderLoopServer) error { + f.wg.Add(1) + defer f.wg.Done() + // Receive Ready if _, err := srv.Recv(); err != nil { return fmt.Errorf("failed to receive ready: %w", err) @@ -149,7 +199,8 @@ func (f *fakePlannerServer) BuilderLoop(srv protos.PlannerForBuilder_BuilderLoop if _, err := srv.Recv(); err != nil { return fmt.Errorf("failed to receive task response: %w", err) } - f.completedTasks.Add(1) + time.Sleep(10 * time.Millisecond) // Simulate task processing time to add some latency. + f.completedTasks.Inc() } // No more tasks. Wait until shutdown. @@ -157,6 +208,10 @@ func (f *fakePlannerServer) BuilderLoop(srv protos.PlannerForBuilder_BuilderLoop return nil } +func (f *fakePlannerServer) CompletedTasks() int { + return int(f.completedTasks.Load()) +} + func (f *fakePlannerServer) NotifyBuilderShutdown(_ context.Context, _ *protos.NotifyBuilderShutdownRequest) (*protos.NotifyBuilderShutdownResponse, error) { f.shutdownCalled = true return &protos.NotifyBuilderShutdownResponse{}, nil diff --git a/pkg/bloombuild/builder/config.go b/pkg/bloombuild/builder/config.go index 25cefa421522..d0c553104b09 100644 --- a/pkg/bloombuild/builder/config.go +++ b/pkg/bloombuild/builder/config.go @@ -4,6 +4,7 @@ import ( "flag" "fmt" + "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/grpcclient" ) @@ -11,12 +12,14 @@ import ( type Config struct { GrpcConfig grpcclient.Config `yaml:"grpc_config"` PlannerAddress string `yaml:"planner_address"` + BackoffConfig backoff.Config `yaml:"backoff_config"` } // RegisterFlagsWithPrefix registers flags for the bloom-planner configuration. func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.StringVar(&cfg.PlannerAddress, prefix+".planner-address", "", "Hostname (and port) of the bloom planner") cfg.GrpcConfig.RegisterFlagsWithPrefix(prefix+".grpc", f) + cfg.BackoffConfig.RegisterFlagsWithPrefix(prefix+".backoff", f) } func (cfg *Config) Validate() error {