Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Attempt publication with circuit breaker #713

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions host.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"github.com/filecoin-project/go-f3/certstore"
"github.com/filecoin-project/go-f3/ec"
"github.com/filecoin-project/go-f3/gpbft"
"github.com/filecoin-project/go-f3/internal/circuitbreaker"
"github.com/filecoin-project/go-f3/internal/clock"
"github.com/filecoin-project/go-f3/internal/psutil"
"github.com/filecoin-project/go-f3/internal/writeaheadlog"
Expand Down Expand Up @@ -41,6 +42,7 @@

participant *gpbft.Participant
topic *pubsub.Topic
cb *circuitbreaker.CircuitBreaker

alertTimer *clock.Timer

Expand Down Expand Up @@ -86,6 +88,7 @@
ctxCancel: ctxCancel,
equivFilter: newEquivocationFilter(pID),
selfMessages: make(map[uint64]map[roundPhase][]*gpbft.GMessage),
cb: circuitbreaker.New(5, 3*time.Second),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on defaults?

}

// create a stopped timer to facilitate alerts requested from gpbft
Expand Down Expand Up @@ -444,7 +447,7 @@
return fmt.Errorf("marshalling GMessage for broadcast: %w", err)
}

err = h.topic.Publish(ctx, bw.Bytes())
err = h.publishWithCircuitBreaker(ctx, bw.Bytes())
if err != nil {
return fmt.Errorf("publishing message: %w", err)
}
Expand All @@ -463,12 +466,21 @@
if err := msg.MarshalCBOR(&bw); err != nil {
return fmt.Errorf("marshalling GMessage for broadcast: %w", err)
}
if err := h.topic.Publish(h.runningCtx, bw.Bytes()); err != nil {
if err := h.publishWithCircuitBreaker(h.runningCtx, bw.Bytes()); err != nil {
return fmt.Errorf("publishing message: %w", err)
}
return nil
}

func (h *gpbftRunner) publishWithCircuitBreaker(ctx context.Context, msg []byte) error {
if h.topic == nil {
return nil

Check warning on line 477 in host.go

View check run for this annotation

Codecov / codecov/patch

host.go#L477

Added line #L477 was not covered by tests
}
return h.cb.Run(func() error {
return h.topic.Publish(ctx, msg)
})
}

var _ pubsub.ValidatorEx = (*gpbftRunner)(nil).validatePubsubMessage

func (h *gpbftRunner) validatePubsubMessage(ctx context.Context, _ peer.ID, msg *pubsub.Message) (_result pubsub.ValidationResult) {
Expand Down
109 changes: 109 additions & 0 deletions internal/circuitbreaker/circuitbreaker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package circuitbreaker

import (
"errors"
"fmt"
"sync"
"time"
)

const (
Closed Status = iota
Open
HalfOpen
)

// ErrOpen signals that the circuit is open. See CircuitBreaker.Run.
var ErrOpen = errors.New("circuit breaker is open")

type Status int

type CircuitBreaker struct {
maxFailures int
resetTimeout time.Duration

// mu guards access to status, lastFailure and failures.
mu sync.Mutex
failures int
lastFailure time.Time
status Status
}

// New creates a new CircuitBreaker instance with the specified maximum number
// of failures and a reset timeout duration.
//
// See CircuitBreaker.Run.
func New(maxFailures int, resetTimeout time.Duration) *CircuitBreaker {
return &CircuitBreaker{
maxFailures: maxFailures,
resetTimeout: resetTimeout,
}
}

// Run attempts to execute the provided function within the context of the
// circuit breaker. It handles state transitions, Closed, Open, or HalfOpen,
// based on the outcome of the attempt.
//
// If the circuit is in the Open state, and not enough time has passed since the
// last failure, the circuit remains open, and the function returns
// ErrOpen without attempting the provided function. If enough time
// has passed, the circuit transitions to HalfOpen, and one attempt is allowed.
//
// In HalfOpen state if the function is executed and returns an error, the
// circuit breaker will transition back to Open status. Otherwise, if the
// function executes successfully, the circuit resets to the Closed state, and
// the failure count is reset to zero.
//
// Example:
//
// cb := NewCircuitBreaker(3, time.Second)
// switch err := cb.Run(func() error {
// // Your attempt logic here
// return nil
// }); {
// case errors.Is(err, ErrCircuitBreakerOpen):
// // No execution attempt was made since the circuit is open.
// case err != nil:
// // Execution attempt failed.
// default:
// // Execution attempt succeeded.
// }
func (cb *CircuitBreaker) Run(attempt func() error) error {
cb.mu.Lock()
defer cb.mu.Unlock()
switch cb.status {
case Open:
if time.Since(cb.lastFailure) < cb.resetTimeout {
// Not enough time has passed since the circuit opened. Do not make any further
// attempts.
return ErrOpen
}
// Enough time has passed since last failure. Proceed to allow one attempt by
// half-opening the circuit.
cb.status = HalfOpen
fallthrough
case HalfOpen, Closed:
if err := attempt(); err != nil {
cb.failures++
if cb.failures >= cb.maxFailures {
// Trip the circuit as we are at or above the max failure threshold.
cb.status = Open
cb.lastFailure = time.Now()
}
return err
}
// Reset the circuit since the attempt succeeded.
cb.status = Closed
cb.failures = 0
return nil
default:
return fmt.Errorf("unknown status: %d", cb.status)

Check warning on line 100 in internal/circuitbreaker/circuitbreaker.go

View check run for this annotation

Codecov / codecov/patch

internal/circuitbreaker/circuitbreaker.go#L99-L100

Added lines #L99 - L100 were not covered by tests
}
}

// GetStatus returns the current status of the CircuitBreaker.
func (cb *CircuitBreaker) GetStatus() Status {
cb.mu.Lock()
defer cb.mu.Unlock()
return cb.status
}
102 changes: 102 additions & 0 deletions internal/circuitbreaker/circuitbreaker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package circuitbreaker_test

import (
"errors"
"sync"
"testing"
"time"

"github.com/filecoin-project/go-f3/internal/circuitbreaker"
"github.com/stretchr/testify/require"
)

func TestCircuitBreaker(t *testing.T) {
t.Parallel()

const (
maxFailures = 3
restTimeout = 10 * time.Millisecond

eventualTimeout = restTimeout * 2
eventualTick = restTimeout / 5
)

var (
failure = errors.New("fish out of water")

succeed = func() error { return nil }
fail = func() error { return failure }
trip = func(t *testing.T, subject *circuitbreaker.CircuitBreaker) {
for range maxFailures {
require.ErrorContains(t, subject.Run(fail), "fish")
}
require.Equal(t, circuitbreaker.Open, subject.GetStatus())
}
)

t.Run("closed on no error", func(t *testing.T) {
t.Parallel()
subject := circuitbreaker.New(maxFailures, restTimeout)
require.NoError(t, subject.Run(succeed))
require.Equal(t, circuitbreaker.Closed, subject.GetStatus())
})

t.Run("opens after max failures and stays open", func(t *testing.T) {
subject := circuitbreaker.New(maxFailures, restTimeout)
trip(t, subject)

// Assert that immediate runs fail, without being attempted, even if they would
// be successful until restTimeout has elapsed.
err := subject.Run(succeed)
require.ErrorIs(t, err, circuitbreaker.ErrOpen)
require.Equal(t, circuitbreaker.Open, subject.GetStatus())
})

t.Run("half-opens eventually", func(t *testing.T) {
subject := circuitbreaker.New(maxFailures, restTimeout)
trip(t, subject)
require.ErrorIs(t, subject.Run(fail), circuitbreaker.ErrOpen)
// Assert that given function is eventually run after circuit is tripped at
// half-open status by checking error type.
require.Eventually(t, func() bool { return errors.Is(subject.Run(fail), failure) }, eventualTimeout, eventualTick)
})

t.Run("closes after rest timeout and success", func(t *testing.T) {
subject := circuitbreaker.New(maxFailures, restTimeout)
trip(t, subject)

require.Eventually(t, func() bool { return subject.Run(succeed) == nil }, eventualTimeout, eventualTick)
require.Equal(t, circuitbreaker.Closed, subject.GetStatus())
})

t.Run("usable concurrently", func(t *testing.T) {
subject := circuitbreaker.New(maxFailures, restTimeout)
const (
wantSuccesses = 7
totalAttempts = 1_000
)
var (
successes, failures int
wg sync.WaitGroup
)
for range totalAttempts {
wg.Add(1)
go func() {
defer wg.Done()
_ = subject.Run(func() error {
// Unsafely increment/decrement counters so that if Run is not synchronised
// properly the test creates a race condition.
if successes < wantSuccesses {
successes++
return nil
}
failures++
return errors.New("error")
})
}()
}
wg.Wait()
require.Equal(t, wantSuccesses, successes)
require.Equal(t, maxFailures, failures)
})
}
Loading