diff --git a/chain/lf3/ec.go b/chain/lf3/ec.go index 38812886c1d..7c6f39740cb 100644 --- a/chain/lf3/ec.go +++ b/chain/lf3/ec.go @@ -15,6 +15,7 @@ import ( "github.com/filecoin-project/go-f3/gpbft" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/build/buildconstants" "github.com/filecoin-project/lotus/chain" "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/actors/builtin/power" @@ -95,6 +96,29 @@ func (ts *f3TipSet) Timestamp() time.Time { return time.Time{} } +func (ec *ecWrapper) runPrefetcher(ctx context.Context) { + // We have a loop here because we'll be automatically unsubscribed if we're too slow. + for { + headChanges := ec.chainStore.SubHeadChanges(ctx) + for changes := range headChanges { + for _, head := range changes { + if head.Type == store.HCRevert { + continue + } + // We do one job at a time because we're just pre-fetching here. If, + // e.g., we end up in "catch up" mode, we'll be unsubscribed (too + // slow) then we'll try again after we wait for one block. + _, _ = ec.getPowerTableLotusTSK(ctx, head.Val.Parents()) + } + } + select { + case <-time.After(time.Duration(buildconstants.BlockDelaySecs) * time.Second): + case <-ctx.Done(): + return + } + } +} + // GetTipsetByEpoch should return a tipset before the one requested if the requested // tipset does not exist due to null epochs func (ec *ecWrapper) GetTipsetByEpoch(ctx context.Context, epoch int64) (ec.TipSet, error) { diff --git a/chain/lf3/f3.go b/chain/lf3/f3.go index 81b3b03bd3d..bbc4a7d63d1 100644 --- a/chain/lf3/f3.go +++ b/chain/lf3/f3.go @@ -106,20 +106,38 @@ func New(mctx helpers.MetricsCtx, lc fx.Lifecycle, params F3Params) (*F3, error) OnStop: fff.inner.Stop, }) - // Start signing F3 messages. lCtx, cancel := context.WithCancel(mctx) - doneCh := make(chan struct{}) + + // start EC wrapper power-table prefetch logic. + donePrefetchingCh := make(chan struct{}) + lc.Append(fx.Hook{ + OnStart: func(context.Context) error { + go func() { + defer close(donePrefetchingCh) + ec.runPrefetcher(lCtx) + }() + return nil + }, + OnStop: func(context.Context) error { + cancel() + <-donePrefetchingCh + return nil + }, + }) + + // Start signing F3 messages. + doneSigningCh := make(chan struct{}) lc.Append(fx.Hook{ OnStart: func(context.Context) error { go func() { - defer close(doneCh) + defer close(doneSigningCh) fff.runSigningLoop(lCtx) }() return nil }, OnStop: func(context.Context) error { cancel() - <-doneCh + <-doneSigningCh return nil }, })