Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(f3): poll the lease by repeatedly participating instead of checking progress #12640

Merged
merged 4 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
- Update `EthGetBlockByNumber` to return a pointer to ethtypes.EthBlock or nil for null rounds. ([filecoin-project/lotus#12529](https://github.com/filecoin-project/lotus/pull/12529))
- Reduce size of embedded genesis CAR files by removing WASM actor blocks and compressing with zstd. This reduces the `lotus` binary size by approximately 10 MiB. ([filecoin-project/lotus#12439](https://github.com/filecoin-project/lotus/pull/12439))
- Add ChainSafe operated Calibration archival node to the bootstrap list ([filecoin-project/lotus#12517](https://github.com/filecoin-project/lotus/pull/12517))
- Fix hotloop in F3 pariticpation API ([filecoin-project/lotus#12575](https://github.com/filecoin-project/lotus/pull/12575))
- `lotus chain head` now supports a `--height` flag to print just the epoch number of the current chain head ([filecoin-project/lotus#12609](https://github.com/filecoin-project/lotus/pull/12609))
- `lotus-shed indexes inspect-indexes` now performs a comprehensive comparison of the event index data for each message by comparing the AMT root CID from the message receipt with the root of a reconstructed AMT. Previously `inspect-indexes` simply compared event counts, comparing AMT roots confirms all the event data is byte-perfect. ([filecoin-project/lotus#12570](https://github.com/filecoin-project/lotus/pull/12570))
- Expose APIs to list the miner IDs that are currently participating in F3 via node. ([filecoin-project/lotus#12608](https://github.com/filecoin-project/lotus/pull/12608))
Expand All @@ -16,9 +15,10 @@
- Fix a bug in the `lotus-shed indexes backfill-events` command that may result in either duplicate events being backfilled where there are existing events (such an operation *should* be idempotent) or events erroneously having duplicate `logIndex` values when queried via ETH APIs. ([filecoin-project/lotus#12567](https://github.com/filecoin-project/lotus/pull/12567))
- Event APIs (Eth events and actor events) should only return reverted events if client queries by specific block hash / tipset. Eth and actor event subscription APIs should always return reverted events to enable accurate observation of real-time changes. ([filecoin-project/lotus#12585](https://github.com/filecoin-project/lotus/pull/12585))
- Add logic to check if the miner's owner address is delegated (f4 address). If it is delegated, the `lotus-shed sectors termination-estimate` command now sends the termination state call using the worker ID. This fix resolves the issue where termination-estimate did not function correctly for miners with delegated owner addresses. ([filecoin-project/lotus#12569](https://github.com/filecoin-project/lotus/pull/12569))
- Fix hotloop in F3 pariticpation API ([filecoin-project/lotus#12575](https://github.com/filecoin-project/lotus/pull/12575))
- Fix a bug in F3 participation API where valid leases may get removed due to dynamic manifest update. ([filecoin-project/lotus#12597](https://github.com/filecoin-project/lotus/pull/12597))

- Change the F3 participation ticket encoding to allow parity testing across non-go implementations, where a ticket issued by Lotus may need to be decoded by, for example, Forest . The changes also enforce the minimum instance participation of 1 for miners. ([filecoin-project/lotus#12615](https://github.com/filecoin-project/lotus/pull/12615))
- Fix issue where F3 wouldn't start participating again if Lotus restarted without restarting the Miner ([filecoin-project/lotus#1240](https://github.com/filecoin-project/lotus/pull/12640)).

## Deps

Expand Down
134 changes: 67 additions & 67 deletions chain/lf3/participation.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-f3/gpbft"
"github.com/filecoin-project/go-f3/manifest"

"github.com/filecoin-project/lotus/api"
Expand All @@ -37,7 +36,6 @@ const (
type F3ParticipationAPI interface {
F3GetOrRenewParticipationTicket(ctx context.Context, minerID address.Address, previous api.F3ParticipationTicket, instances uint64) (api.F3ParticipationTicket, error) //perm:sign
F3Participate(ctx context.Context, ticket api.F3ParticipationTicket) (api.F3ParticipationLease, error)
F3GetProgress(ctx context.Context) (gpbft.Instant, error)
F3GetManifest(ctx context.Context) (*manifest.Manifest, error)
}

Expand All @@ -46,7 +44,6 @@ type Participant struct {
participant address.Address
backoff *backoff.Backoff
maxCheckProgressAttempts int
previousTicket api.F3ParticipationTicket
leaseTerm uint64

runningCtx context.Context
Expand Down Expand Up @@ -92,21 +89,18 @@ func (p *Participant) run(ctx context.Context) (_err error) {
}
}()

var ticket api.F3ParticipationTicket
for ctx.Err() == nil {
var err error
start := time.Now()
ticket, err := p.tryGetF3ParticipationTicket(ctx)
ticket, err = p.tryGetF3ParticipationTicket(ctx, ticket)
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
lease, participating, err := p.tryF3Participate(ctx, ticket)
err = p.tryParticipate(ctx, ticket)
if err != nil {
return err
}
if participating {
if err := p.awaitLeaseExpiry(ctx, lease); err != nil {
return err
}
}
const minPeriod = 500 * time.Millisecond
if sinceLastLoop := time.Since(start); sinceLastLoop < minPeriod {
select {
Expand All @@ -120,10 +114,10 @@ func (p *Participant) run(ctx context.Context) (_err error) {
return ctx.Err()
}

func (p *Participant) tryGetF3ParticipationTicket(ctx context.Context) (api.F3ParticipationTicket, error) {
func (p *Participant) tryGetF3ParticipationTicket(ctx context.Context, previousTicket api.F3ParticipationTicket) (api.F3ParticipationTicket, error) {
p.backoff.Reset()
for ctx.Err() == nil {
switch ticket, err := p.node.F3GetOrRenewParticipationTicket(ctx, p.participant, p.previousTicket, p.leaseTerm); {
switch ticket, err := p.node.F3GetOrRenewParticipationTicket(ctx, p.participant, previousTicket, p.leaseTerm); {
case ctx.Err() != nil:
return api.F3ParticipationTicket{}, ctx.Err()
case errors.Is(err, api.ErrF3Disabled):
Expand All @@ -142,25 +136,51 @@ func (p *Participant) tryGetF3ParticipationTicket(ctx context.Context) (api.F3Pa
return api.F3ParticipationTicket{}, ctx.Err()
}

func (p *Participant) tryF3Participate(ctx context.Context, ticket api.F3ParticipationTicket) (api.F3ParticipationLease, bool, error) {
func (p *Participant) getManifest(ctx context.Context) (*manifest.Manifest, error) {
p.backoff.Reset()
for ctx.Err() == nil {
switch manifest, err := p.node.F3GetManifest(ctx); {
case errors.Is(err, api.ErrF3Disabled):
log.Errorw("Cannot await F3 participation lease expiry as F3 is disabled.", "err", err)
return nil, xerrors.Errorf("awaiting F3 participation lease expiry: %w", err)
case err != nil:
log.Errorw("Error when fetching F3 manifest. Retrying after backoff.", "attempts", p.backoff.Attempt(), "backoff", p.backoff.Duration(), "err", err)
case manifest == nil:
// Can happen if we reboot and have no manifest.
log.Warnw("Received no F3 manifest from lotus. Retrying after backoff.", "attempts", p.backoff.Attempt(), "backoff", p.backoff.Duration())
default:
return manifest, nil
}
p.backOff(ctx)
}
return nil, ctx.Err()
}

func (p *Participant) tryParticipate(ctx context.Context, ticket api.F3ParticipationTicket) error {
p.backoff.Reset()
renewLeaseWithin := p.leaseTerm / 2
var (
manifest *manifest.Manifest
haveLease bool
)
for ctx.Err() == nil {
switch lease, err := p.node.F3Participate(ctx, ticket); {
lease, err := p.node.F3Participate(ctx, ticket)
switch {
case ctx.Err() != nil:
return api.F3ParticipationLease{}, false, ctx.Err()
return ctx.Err()
case errors.Is(err, api.ErrF3Disabled):
log.Errorw("Cannot participate in F3 as it is disabled.", "err", err)
return api.F3ParticipationLease{}, false, xerrors.Errorf("attempting F3 participation with ticket: %w", err)
return xerrors.Errorf("attempting F3 participation with ticket: %w", err)
case errors.Is(err, api.ErrF3ParticipationTicketExpired):
log.Warnw("F3 participation ticket expired while attempting to participate. Acquiring a new ticket.", "attempts", p.backoff.Attempt(), "err", err)
return api.F3ParticipationLease{}, false, nil
return nil
case errors.Is(err, api.ErrF3ParticipationTicketStartBeforeExisting):
log.Warnw("F3 participation ticket starts before the existing lease. Acquiring a new ticket.", "attempts", p.backoff.Attempt(), "err", err)
return api.F3ParticipationLease{}, false, nil
return nil
case errors.Is(err, api.ErrF3ParticipationTicketInvalid):
log.Errorw("F3 participation ticket is not valid. Acquiring a new ticket after backoff.", "backoff", p.backoff.Duration(), "attempts", p.backoff.Attempt(), "err", err)
p.backOff(ctx)
return api.F3ParticipationLease{}, false, nil
return nil
case errors.Is(err, api.ErrF3ParticipationIssuerMismatch):
log.Warnw("Node is not the issuer of F3 participation ticket. Miner maybe load-balancing or node has changed. Retrying F3 participation after backoff.", "backoff", p.backoff.Duration(), "err", err)
p.backOff(ctx)
Expand All @@ -171,69 +191,49 @@ func (p *Participant) tryF3Participate(ctx context.Context, ticket api.F3Partici
p.backOff(ctx)
continue
case err != nil:
if p.backoff.Attempt() > float64(p.maxCheckProgressAttempts) {
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
log.Errorw("Too many failures while attempting to check F3 progress. Restarting participation.", "attempts", p.backoff.Attempt(), "err", err)
return nil
}
log.Errorw("Unexpected error while attempting F3 participation. Retrying after backoff", "backoff", p.backoff.Duration(), "attempts", p.backoff.Attempt(), "err", err)
p.backOff(ctx)
continue
case lease.ValidityTerm <= renewLeaseWithin:
return nil
default:
// we succeeded so reset the backoff.
p.backoff.Reset()
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
}

// Log the first time we give out the lease.
if !haveLease {
log.Infow("Successfully acquired F3 participation lease.",
"issuer", lease.Issuer,
"not-before", lease.FromInstance,
"not-after", lease.ToInstance(),
)
p.previousTicket = ticket
return lease, true, nil
haveLease = true
}
}
return api.F3ParticipationLease{}, false, ctx.Err()
}

func (p *Participant) awaitLeaseExpiry(ctx context.Context, lease api.F3ParticipationLease) error {
p.backoff.Reset()
renewLeaseWithin := p.leaseTerm / 2
for ctx.Err() == nil {
manifest, err := p.node.F3GetManifest(ctx)
switch {
case errors.Is(err, api.ErrF3Disabled):
log.Errorw("Cannot await F3 participation lease expiry as F3 is disabled.", "err", err)
return xerrors.Errorf("awaiting F3 participation lease expiry: %w", err)
case err != nil:
if p.backoff.Attempt() > float64(p.maxCheckProgressAttempts) {
log.Errorw("Too many failures while attempting to check F3 progress. Restarting participation.", "attempts", p.backoff.Attempt(), "err", err)
return nil
// Fetch the manifest if necessary.
if manifest == nil || lease.Network != manifest.NetworkName {
manifest, err = p.getManifest(ctx)
if err != nil {
return err
}
log.Errorw("Failed to check F3 progress while awaiting lease expiry. Retrying after backoff.", "attempts", p.backoff.Attempt(), "backoff", p.backoff.Duration(), "err", err)
p.backOff(ctx)
continue
case manifest == nil || manifest.NetworkName != lease.Network:
// If we got an unexpected manifest, or no manifest, go back to the
// beginning and try to get another ticket. Switching from having a manifest
// to having no manifest can theoretically happen if the lotus node reboots
// and has no static manifest.
return nil
}
switch progress, err := p.node.F3GetProgress(ctx); {
case errors.Is(err, api.ErrF3Disabled):
log.Errorw("Cannot await F3 participation lease expiry as F3 is disabled.", "err", err)
return xerrors.Errorf("awaiting F3 participation lease expiry: %w", err)
case err != nil:
if p.backoff.Attempt() > float64(p.maxCheckProgressAttempts) {
log.Errorw("Too many failures while attempting to check F3 progress. Restarting participation.", "attempts", p.backoff.Attempt(), "err", err)
if manifest.NetworkName != lease.Network {
log.Warnf("Got a manifest for network %q while waiting for a lease on network %q. Getting another ticket.", manifest.NetworkName, lease.Network)
return nil
}
log.Errorw("Failed to check F3 progress while awaiting lease expiry. Retrying after backoff.", "attempts", p.backoff.Attempt(), "backoff", p.backoff.Duration(), "err", err)
p.backOff(ctx)
case progress.ID+renewLeaseWithin >= lease.ToInstance():
log.Infof("F3 progressed (%d) to within %d instances of lease expiry (%d). Renewing participation.", progress.ID, renewLeaseWithin, lease.ToInstance())
return nil
default:
remainingInstanceLease := lease.ToInstance() - progress.ID
waitTime := time.Duration(remainingInstanceLease-renewLeaseWithin) * manifest.CatchUpAlignment
if waitTime == 0 {
waitTime = 100 * time.Millisecond
}
log.Debugf("F3 participation lease is valid for further %d instances. Re-checking after %s.", remainingInstanceLease, waitTime)
p.backOffFor(ctx, waitTime)
}

// Wait until we think we may need to renew the lease.
waitTime := time.Duration(lease.ValidityTerm-renewLeaseWithin) * manifest.CatchUpAlignment
if waitTime == 0 {
waitTime = 100 * time.Millisecond
}
log.Debugf("F3 participation lease is valid for further %d instances. Re-checking after %s.", lease.ValidityTerm, waitTime)
p.backOffFor(ctx, waitTime)
}
return ctx.Err()
}
Expand Down
17 changes: 12 additions & 5 deletions chain/lf3/participation_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,18 +101,25 @@ func (l *leaser) participate(ticket api.F3ParticipationTicket) (api.F3Participat
l.mutex.Lock()
defer l.mutex.Unlock()
currentLease, found := l.leases[newLease.MinerID]
if found && currentLease.Network == newLease.Network && currentLease.FromInstance > newLease.FromInstance {
// For safety, strictly require lease start instance to never decrease.
return api.F3ParticipationLease{}, api.ErrF3ParticipationTicketStartBeforeExisting
}
if !found {
if found {
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
// short-circuite for reparticipation.
if currentLease == newLease {
return newLease, nil
}
if currentLease.Network == newLease.Network && currentLease.FromInstance > newLease.FromInstance {
// For safety, strictly require lease start instance to never decrease.
return api.F3ParticipationLease{}, api.ErrF3ParticipationTicketStartBeforeExisting
}
} else {
log.Infof("started participating in F3 for miner %d", newLease.MinerID)
}
l.leases[newLease.MinerID] = newLease
select {
case l.notifyParticipation <- struct{}{}:
default:
}
newLease.ValidityTerm = newLease.ToInstance() - instant.ID
newLease.FromInstance = instant.ID
return newLease, nil
}

Expand Down
3 changes: 2 additions & 1 deletion chain/lf3/participation_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ func TestLeaser(t *testing.T) {
require.NoError(t, err)
require.Equal(t, uint64(123), lease.MinerID)
require.Equal(t, issuer.String(), lease.Issuer)
require.Equal(t, uint64(5), lease.ValidityTerm) // Current instance (10) + offset (5)
require.Equal(t, uint64(10), lease.FromInstance) // Current instance (10) + offset (5)
require.Equal(t, uint64(5), lease.ValidityTerm) // Current instance (10) + offset (5)
})
t.Run("get participants", func(t *testing.T) {
progress.currentInstance = 11
Expand Down
79 changes: 74 additions & 5 deletions chain/lf3/participation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (
"time"

"github.com/jpillora/backoff"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-f3/gpbft"
"github.com/filecoin-project/go-f3/manifest"

"github.com/filecoin-project/lotus/api"
Expand Down Expand Up @@ -41,10 +41,6 @@ func (m *manifestFailAPI) F3GetOrRenewParticipationTicket(ctx context.Context, m
}
}

func (m *manifestFailAPI) F3GetProgress(ctx context.Context) (gpbft.Instant, error) {
return gpbft.Instant{}, nil
}

func (m *manifestFailAPI) F3Participate(ctx context.Context, ticket api.F3ParticipationTicket) (api.F3ParticipationLease, error) {
return api.F3ParticipationLease{
Network: "test",
Expand Down Expand Up @@ -73,3 +69,76 @@ func TestParticipantManifestFailure(t *testing.T) {
<-api.manifestRequested
require.NoError(t, p.Stop(context.Background()))
}

type repeatedParticipateAPI struct {
secondTicket chan struct{}
instance uint64
t *testing.T
}

func (m *repeatedParticipateAPI) F3GetManifest(ctx context.Context) (*manifest.Manifest, error) {
return &manifest.Manifest{
NetworkName: "test",
CatchUpAlignment: time.Millisecond,
}, nil
}

func (m *repeatedParticipateAPI) F3GetOrRenewParticipationTicket(ctx context.Context, minerID address.Address, previous api.F3ParticipationTicket, instances uint64) (api.F3ParticipationTicket, error) {
switch string(previous) {
case "first ticket":
return api.F3ParticipationTicket("second ticket"), nil
case "":
return api.F3ParticipationTicket("first ticket"), nil
default:
panic("unexpected ticket")
}
}

func (m *repeatedParticipateAPI) F3Participate(ctx context.Context, ticket api.F3ParticipationTicket) (api.F3ParticipationLease, error) {
switch string(ticket) {
case "first ticket":
case "second ticket":
// This is 6, not 5, because we expect one final call to participate before getting
// a new ticket.
assert.Equal(m.t, uint64(6), m.instance)
close(m.secondTicket)
return api.F3ParticipationLease{}, api.ErrF3ParticipationIssuerMismatch
default:
m.t.Errorf("unexpected f3 ticket: %s", string(ticket))
return api.F3ParticipationLease{}, api.ErrF3Disabled
}

if m.instance >= 10 {
m.t.Error("did not expect the participant to continue past the half-way point")
return api.F3ParticipationLease{}, api.ErrF3Disabled
}

lease := api.F3ParticipationLease{
Network: "test",
Issuer: "foobar",
MinerID: 0,
FromInstance: m.instance,
ValidityTerm: 10 - m.instance,
}
m.instance++

return lease, nil
}

// Make sure we keep calling participate until our validity term drops to half (5) of the initial
// term (10). At that point, the participant should request a new ticket.
func TestParticipantRepeat(t *testing.T) {
api := &repeatedParticipateAPI{secondTicket: make(chan struct{}), t: t}
addr, err := address.NewIDAddress(1000)
require.NoError(t, err)

p := lf3.NewParticipant(context.Background(), api, dtypes.MinerAddress(addr),
&backoff.Backoff{
Min: 1 * time.Second,
Max: 1 * time.Minute,
Factor: 1.5,
}, 13, 10)
require.NoError(t, p.Start(context.Background()))
<-api.secondTicket
require.NoError(t, p.Stop(context.Background()))
}
Loading