Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions backend/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"fmt"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/cenkalti/backoff/v5"
"github.com/google/uuid"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -104,8 +104,6 @@ func (c *backendClient) waitForOrchestrationCondition(ctx context.Context, id ap
MaxInterval: 10 * time.Second,
Multiplier: 1.5,
RandomizationFactor: 0.05,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}
b.Reset()

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to me this could be rewriten to use backoff.Retry as well.

The only difference: the backoff library doesn't start by waiting for a tick.

Expand Down
5 changes: 1 addition & 4 deletions backend/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"sync"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/cenkalti/backoff/v5"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
Expand Down Expand Up @@ -477,10 +477,7 @@ func (g *grpcExecutor) waitForInstance(ctx context.Context, req *protos.GetInsta
MaxInterval: 3 * time.Second,
Multiplier: 1.5,
RandomizationFactor: 0.5,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}
b = backoff.WithContext(b, ctx)
b.Reset()

loop:
Expand Down
5 changes: 1 addition & 4 deletions backend/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"sync/atomic"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/cenkalti/backoff/v5"
"github.com/marusama/semaphore/v2"
)

Expand Down Expand Up @@ -99,10 +99,7 @@ func (w *worker) Start(ctx context.Context) {
MaxInterval: 5 * time.Second,
Multiplier: 1.05,
RandomizationFactor: 0.05,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}
b = backoff.WithContext(b, ctx)
b.Reset()

loop:
Expand Down
30 changes: 13 additions & 17 deletions client/client_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"fmt"

"github.com/cenkalti/backoff/v4"
"github.com/cenkalti/backoff/v5"
"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/wrapperspb"
Expand Down Expand Up @@ -70,20 +70,18 @@ func (c *TaskHubGrpcClient) FetchOrchestrationMetadata(ctx context.Context, id a
//
// api.ErrInstanceNotFound is returned when the specified orchestration doesn't exist.
func (c *TaskHubGrpcClient) WaitForOrchestrationStart(ctx context.Context, id api.InstanceID, opts ...api.FetchOrchestrationMetadataOptions) (*api.OrchestrationMetadata, error) {
var resp *protos.GetInstanceResponse
var err error
err = backoff.Retry(func() error {
resp, err := backoff.Retry(ctx, func() (*protos.GetInstanceResponse, error) {
req := makeGetInstanceRequest(id, opts)
resp, err = c.client.WaitForInstanceStart(ctx, req)
resp, err := c.client.WaitForInstanceStart(ctx, req)
if err != nil {
// if its context cancelled stop retrying
if ctx.Err() != nil {
return backoff.Permanent(ctx.Err())
return nil, backoff.Permanent(ctx.Err())
}
return fmt.Errorf("failed to wait for orchestration start: %w", err)
return nil, fmt.Errorf("failed to wait for orchestration start: %w", err)
}
return nil
}, backoff.WithContext(newInfiniteRetries(), ctx))
return resp, nil
}, backoff.WithBackOff(newInfiniteRetries()))
if err != nil {
return nil, err
}
Expand All @@ -95,20 +93,18 @@ func (c *TaskHubGrpcClient) WaitForOrchestrationStart(ctx context.Context, id ap
//
// api.ErrInstanceNotFound is returned when the specified orchestration doesn't exist.
func (c *TaskHubGrpcClient) WaitForOrchestrationCompletion(ctx context.Context, id api.InstanceID, opts ...api.FetchOrchestrationMetadataOptions) (*api.OrchestrationMetadata, error) {
var resp *protos.GetInstanceResponse
var err error
err = backoff.Retry(func() error {
resp, err := backoff.Retry(ctx, func() (*protos.GetInstanceResponse, error) {
req := makeGetInstanceRequest(id, opts)
resp, err = c.client.WaitForInstanceCompletion(ctx, req)
resp, err := c.client.WaitForInstanceCompletion(ctx, req)
if err != nil {
// if its context cancelled stop retrying
if ctx.Err() != nil {
return backoff.Permanent(ctx.Err())
return nil, backoff.Permanent(ctx.Err())
}
return fmt.Errorf("failed to wait for orchestration completion: %w", err)
return nil, fmt.Errorf("failed to wait for orchestration completion: %w", err)
}
return nil
}, backoff.WithContext(newInfiniteRetries(), ctx))
return resp, nil
}, backoff.WithBackOff(newInfiniteRetries()))
if err != nil {
return nil, err
}
Expand Down
16 changes: 7 additions & 9 deletions client/worker_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"io"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/cenkalti/backoff/v5"
"github.com/microsoft/durabletask-go/api"
"github.com/microsoft/durabletask-go/backend"
"github.com/microsoft/durabletask-go/internal/helpers"
Expand Down Expand Up @@ -90,23 +90,23 @@ func (c *TaskHubGrpcClient) StartWorkItemListener(ctx context.Context, r *task.T
return
}

err = backoff.Retry(
func() error {
_, err = backoff.Retry[any](ctx,
func() (any, error) {
// user wants to stop the listener
if ctx.Err() != nil {
return backoff.Permanent(ctx.Err())
return nil, backoff.Permanent(ctx.Err())
}

c.logger.Infof("reconnecting work item listener stream")
streamErr := initStream()
if streamErr != nil {
c.logger.Errorf("error initializing work item listener stream %v", streamErr)
return streamErr
return nil, streamErr
}
return nil
return nil, nil
},
// retry forever since we don't have a way of asynchronously return errors to the user
newInfiniteRetries(),
backoff.WithBackOff(newInfiniteRetries()),
)
if err != nil {
c.logger.Infof("stopping background processor, unable to reconnect stream: %v", err)
Expand Down Expand Up @@ -205,8 +205,6 @@ func newInfiniteRetries() *backoff.ExponentialBackOff {
b := backoff.NewExponentialBackOff()
// max wait of 15 seconds between retries
b.MaxInterval = 15 * time.Second
Copy link

Copilot AI Sep 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The removal of b.MaxElapsedTime = 0 breaks the infinite retry behavior. In backoff v5, the default MaxElapsedTime is 15 minutes, so retries will stop after that duration. You need to set b.MaxElapsedTime = 0 to maintain infinite retries as indicated by the function name and comment.

Suggested change
b.MaxInterval = 15 * time.Second
b.MaxInterval = 15 * time.Second
b.MaxElapsedTime = 0 // infinite retries

Copilot uses AI. Check for mistakes.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this has been moved to Retry. There is no max elapsed time tracking in the backoff setting anymore.

// retry forever
b.MaxElapsedTime = 0
b.Reset()
return b
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/microsoft/durabletask-go
go 1.23.0

require (
github.com/cenkalti/backoff/v4 v4.1.3
github.com/cenkalti/backoff/v5 v5.0.3
github.com/golang/protobuf v1.5.3
github.com/google/uuid v1.3.0
github.com/jackc/pgx/v5 v5.7.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4=
github.com/cenkalti/backoff/v4 v4.1.3/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM=
github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
Loading