Skip to content

Commit

Permalink
Chore: Backports to the release/v1.26.0 branch (#11713)
Browse files Browse the repository at this point in the history
* enable storing events (#11712)

* fix: commit batch: Always go through commit batcher (#11704)

* fix: commit batch: Always go through commit batcher

* fix sealing fsm tests

* sealing pipeline: Fix panic on padding pieces in WaitDeals (#11708)

* sealing pipeline: Fix panic on padding pieces in WaitDeals

* sealing pipeline: Catch panics

* sealing pipeline: Output DDO pieces in SectorStatus (#11709)

* sealing pipeline: Fix failing ProveCommit3 aggregate (#11710)

* itests: Repro failing ProveCommit3 aggregate

* commit batch: Correctly sort sectors in processBatchV2

* fix imports

* ci: Bigger instance for sector_pledge test

* itests: Use Must-Post mining in TestPledgeBatching

---------

Co-authored-by: Aarsh Shah <[email protected]>
Co-authored-by: Łukasz Magiera <[email protected]>
  • Loading branch information
3 people authored Mar 13, 2024
1 parent ea2d079 commit f929ae1
Show file tree
Hide file tree
Showing 12 changed files with 122 additions and 127 deletions.
1 change: 1 addition & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,7 @@ workflows:
- build
suite: itest-sector_pledge
target: "./itests/sector_pledge_test.go"
resource_class: 2xlarge
get-params: true

- test:
Expand Down
2 changes: 1 addition & 1 deletion .circleci/template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ workflows:
- build
suite: itest-[[ $name ]]
target: "./itests/[[ $file ]]"
[[- if or (eq $name "worker") (eq $name "deals_concurrent") (eq $name "wdpost_worker_config")]]
[[- if or (eq $name "worker") (eq $name "deals_concurrent") (eq $name "wdpost_worker_config") (eq $name "sector_pledge")]]
resource_class: 2xlarge
[[- end]]
[[- if or (eq $name "wdpost") (eq $name "sector_pledge")]]
Expand Down
8 changes: 7 additions & 1 deletion itests/kit/ensemble_opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,13 @@ var DefaultEnsembleOpts = ensembleOpts{
}

// MockProofs activates mock proofs for the entire ensemble.
func MockProofs() EnsembleOpt {
func MockProofs(e ...bool) EnsembleOpt {
if len(e) > 0 && !e[0] {
return func(opts *ensembleOpts) error {
return nil
}
}

return func(opts *ensembleOpts) error {
opts.mockProofs = true
// since we're using mock proofs, we don't need to download
Expand Down
20 changes: 15 additions & 5 deletions itests/sector_pledge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
miner5 "github.com/filecoin-project/specs-actors/v5/actors/builtin/miner"

"github.com/filecoin-project/lotus/api"
Expand Down Expand Up @@ -39,7 +40,7 @@ func TestPledgeSectors(t *testing.T) {
defer cancel()

_, miner, ens := kit.EnsembleMinimal(t, kit.MockProofs())
ens.InterconnectAll().BeginMining(blockTime)
ens.InterconnectAll().BeginMiningMustPost(blockTime)

miner.PledgeSectors(ctx, nSectors, 0, nil)
}
Expand All @@ -65,12 +66,18 @@ func TestPledgeBatching(t *testing.T) {
//stm: @SECTOR_PRE_COMMIT_FLUSH_001, @SECTOR_COMMIT_FLUSH_001
blockTime := 50 * time.Millisecond

runTest := func(t *testing.T, nSectors int) {
runTest := func(t *testing.T, nSectors int, aggregate bool) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

client, miner, ens := kit.EnsembleMinimal(t, kit.MockProofs())
ens.InterconnectAll().BeginMining(blockTime)
kit.QuietMiningLogs()

client, miner, ens := kit.EnsembleMinimal(t, kit.MockProofs(!aggregate), kit.MutateSealingConfig(func(sc *config.SealingConfig) {
if aggregate {
sc.AggregateAboveBaseFee = types.FIL(big.Zero())
}
}))
ens.InterconnectAll().BeginMiningMustPost(blockTime)

client.WaitTillChain(ctx, kit.HeightAtLeast(10))

Expand Down Expand Up @@ -114,7 +121,10 @@ func TestPledgeBatching(t *testing.T) {
}

t.Run("100", func(t *testing.T) {
runTest(t, 100)
runTest(t, 100, false)
})
t.Run("10-agg", func(t *testing.T) {
runTest(t, 10, true)
})
}

Expand Down
2 changes: 1 addition & 1 deletion node/builder_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func ConfigFullNode(c interface{}) Option {
// If the Eth JSON-RPC is enabled, enable storing events at the ChainStore.
// This is the case even if real-time and historic filtering are disabled,
// as it enables us to serve logs in eth_getTransactionReceipt.
If(cfg.Fevm.EnableEthRPC, Override(StoreEventsKey, modules.EnableStoringEvents)),
If(cfg.Fevm.EnableEthRPC || cfg.Events.EnableActorEventsAPI, Override(StoreEventsKey, modules.EnableStoringEvents)),

Override(new(dtypes.ClientImportMgr), modules.ClientImportMgr),

Expand Down
19 changes: 9 additions & 10 deletions storage/pipeline/commit_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes,
return nil, xerrors.Errorf("getting config: %w", err)
}

if notif && total < cfg.MaxCommitBatch {
if notif && total < cfg.MaxCommitBatch && cfg.AggregateCommits {
return nil, nil
}

Expand All @@ -233,7 +233,7 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes,
return false
}

individual := (total < cfg.MinCommitBatch) || (total < miner.MinAggregatedSectors) || blackedOut()
individual := (total < cfg.MinCommitBatch) || (total < miner.MinAggregatedSectors) || blackedOut() || !cfg.AggregateCommits

if !individual && !cfg.AggregateAboveBaseFee.Equals(big.Zero()) {
if ts.MinTicketBlock().ParentBaseFee.LessThan(cfg.AggregateAboveBaseFee) {
Expand Down Expand Up @@ -331,6 +331,9 @@ func (b *CommitBatcher) processBatchV2(cfg sealiface.Config, sectors []abi.Secto
return nil, err
}

// sort sectors by number
sort.Slice(sectors, func(i, j int) bool { return sectors[i] < sectors[j] })

total := len(sectors)

res := sealiface.CommitBatchRes{
Expand Down Expand Up @@ -371,10 +374,6 @@ func (b *CommitBatcher) processBatchV2(cfg sealiface.Config, sectors []abi.Secto
return nil, nil
}

sort.Slice(infos, func(i, j int) bool {
return infos[i].Number < infos[j].Number
})

proofs := make([][]byte, 0, total)
for _, info := range infos {
proofs = append(proofs, b.todo[info.Number].Proof)
Expand Down Expand Up @@ -444,13 +443,13 @@ func (b *CommitBatcher) processBatchV2(cfg sealiface.Config, sectors []abi.Secto
enc := new(bytes.Buffer)
if err := params.MarshalCBOR(enc); err != nil {
res.Error = err.Error()
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't serialize ProveCommitSectors2Params: %w", err)
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't serialize ProveCommitSectors3Params: %w", err)
}

_, err = simulateMsgGas(b.mctx, b.api, from, b.maddr, builtin.MethodsMiner.ProveCommitSectors3, needFunds, maxFee, enc.Bytes())

if err != nil && (!api.ErrorIsIn(err, []error{&api.ErrOutOfGas{}}) || len(sectors) < miner.MinAggregatedSectors*2) {
log.Errorf("simulating CommitBatch message failed: %s", err)
log.Errorf("simulating CommitBatch message failed (%x): %s", enc.Bytes(), err)
res.Error = err.Error()
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("simulating CommitBatch message failed: %w", err)
}
Expand All @@ -474,7 +473,7 @@ func (b *CommitBatcher) processBatchV2(cfg sealiface.Config, sectors []abi.Secto

res.Msg = &mcid

log.Infow("Sent ProveCommitSectors2 message", "cid", mcid, "from", from, "todo", total, "sectors", len(infos))
log.Infow("Sent ProveCommitSectors3 message", "cid", mcid, "from", from, "todo", total, "sectors", len(infos))

return []sealiface.CommitBatchRes{res}, nil
}
Expand Down Expand Up @@ -591,7 +590,7 @@ func (b *CommitBatcher) processBatchV1(cfg sealiface.Config, sectors []abi.Secto
_, err = simulateMsgGas(b.mctx, b.api, from, b.maddr, builtin.MethodsMiner.ProveCommitAggregate, needFunds, maxFee, enc.Bytes())

if err != nil && (!api.ErrorIsIn(err, []error{&api.ErrOutOfGas{}}) || len(sectors) < miner.MinAggregatedSectors*2) {
log.Errorf("simulating CommitBatch message failed: %s", err)
log.Errorf("simulating CommitBatch message failed (%x): %s", enc.Bytes(), err)
res.Error = err.Error()
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("simulating CommitBatch message failed: %w", err)
}
Expand Down
30 changes: 25 additions & 5 deletions storage/pipeline/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"net/http"
"os"
"reflect"
"runtime"
"time"

"golang.org/x/xerrors"
Expand Down Expand Up @@ -39,8 +40,27 @@ func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface
return nil, processed, nil
}

return func(ctx statemachine.Context, si SectorInfo) error {
err := next(ctx, si)
return func(ctx statemachine.Context, si SectorInfo) (err error) {
// handle panics
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 1<<16)
n := runtime.Stack(buf, false)
buf = buf[:n]

l := Log{
Timestamp: uint64(time.Now().Unix()),
Message: fmt.Sprintf("panic: %v\n%s", r, buf),
Kind: "panic",
}
si.logAppend(l)

err = fmt.Errorf("panic: %v\n%s", r, buf)
}
}()

// execute the next state
err = next(ctx, si)
if err != nil {
log.Errorf("unhandled sector error (%d): %+v", si.SectorNumber, err)
return nil
Expand Down Expand Up @@ -127,8 +147,8 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
),
Committing: planCommitting,
CommitFinalize: planOne(
on(SectorFinalized{}, SubmitCommit),
on(SectorFinalizedAvailable{}, SubmitCommit),
on(SectorFinalized{}, SubmitCommitAggregate),
on(SectorFinalizedAvailable{}, SubmitCommitAggregate),
on(SectorFinalizeFailed{}, CommitFinalizeFailed),
),
SubmitCommit: planOne(
Expand Down Expand Up @@ -674,7 +694,7 @@ func planCommitting(events []statemachine.Event, state *SectorInfo) (uint64, err
}
case SectorCommitted: // the normal case
e.apply(state)
state.State = SubmitCommit
state.State = SubmitCommitAggregate
case SectorProofReady: // early finalize
e.apply(state)
state.State = CommitFinalize
Expand Down
23 changes: 10 additions & 13 deletions storage/pipeline/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,18 @@ func TestHappyPath(t *testing.T) {
require.Equal(m.t, m.state.State, Committing)

m.planSingle(SectorCommitted{})
require.Equal(m.t, m.state.State, SubmitCommit)
require.Equal(m.t, m.state.State, SubmitCommitAggregate)

m.planSingle(SectorCommitSubmitted{})
require.Equal(m.t, m.state.State, CommitWait)
m.planSingle(SectorCommitAggregateSent{})
require.Equal(m.t, m.state.State, CommitAggregateWait)

m.planSingle(SectorProving{})
require.Equal(m.t, m.state.State, FinalizeSector)

m.planSingle(SectorFinalized{})
require.Equal(m.t, m.state.State, Proving)

expected := []SectorState{Packing, GetTicket, PreCommit1, PreCommit2, SubmitPreCommitBatch, PreCommitBatchWait, WaitSeed, Committing, SubmitCommit, CommitWait, FinalizeSector, Proving}
expected := []SectorState{Packing, GetTicket, PreCommit1, PreCommit2, SubmitPreCommitBatch, PreCommitBatchWait, WaitSeed, Committing, SubmitCommitAggregate, CommitAggregateWait, FinalizeSector, Proving}
for i, n := range notif {
if n.before.State != expected[i] {
t.Fatalf("expected before state: %s, got: %s", expected[i], n.before.State)
Expand Down Expand Up @@ -135,9 +135,6 @@ func TestHappyPathFinalizeEarly(t *testing.T) {
require.Equal(m.t, m.state.State, CommitFinalize)

m.planSingle(SectorFinalized{})
require.Equal(m.t, m.state.State, SubmitCommit)

m.planSingle(SectorSubmitCommitAggregate{})
require.Equal(m.t, m.state.State, SubmitCommitAggregate)

m.planSingle(SectorCommitAggregateSent{})
Expand All @@ -149,7 +146,7 @@ func TestHappyPathFinalizeEarly(t *testing.T) {
m.planSingle(SectorFinalized{})
require.Equal(m.t, m.state.State, Proving)

expected := []SectorState{Packing, GetTicket, PreCommit1, PreCommit2, SubmitPreCommitBatch, PreCommitBatchWait, WaitSeed, Committing, CommitFinalize, SubmitCommit, SubmitCommitAggregate, CommitAggregateWait, FinalizeSector, Proving}
expected := []SectorState{Packing, GetTicket, PreCommit1, PreCommit2, SubmitPreCommitBatch, PreCommitBatchWait, WaitSeed, Committing, CommitFinalize, SubmitCommitAggregate, CommitAggregateWait, FinalizeSector, Proving}
for i, n := range notif {
if n.before.State != expected[i] {
t.Fatalf("expected before state: %s, got: %s", expected[i], n.before.State)
Expand Down Expand Up @@ -188,9 +185,9 @@ func TestCommitFinalizeFailed(t *testing.T) {
require.Equal(m.t, m.state.State, CommitFinalize)

m.planSingle(SectorFinalized{})
require.Equal(m.t, m.state.State, SubmitCommit)
require.Equal(m.t, m.state.State, SubmitCommitAggregate)

expected := []SectorState{Committing, CommitFinalize, CommitFinalizeFailed, CommitFinalize, SubmitCommit}
expected := []SectorState{Committing, CommitFinalize, CommitFinalizeFailed, CommitFinalize, SubmitCommitAggregate}
for i, n := range notif {
if n.before.State != expected[i] {
t.Fatalf("expected before state: %s, got: %s", expected[i], n.before.State)
Expand Down Expand Up @@ -242,10 +239,10 @@ func TestSeedRevert(t *testing.T) {
// not changing the seed this time
_, _, err = m.s.plan([]statemachine.Event{{User: SectorSeedReady{SeedValue: nil, SeedEpoch: 5}}, {User: SectorCommitted{}}}, m.state)
require.NoError(t, err)
require.Equal(m.t, m.state.State, SubmitCommit)
require.Equal(m.t, m.state.State, SubmitCommitAggregate)

m.planSingle(SectorCommitSubmitted{})
require.Equal(m.t, m.state.State, CommitWait)
m.planSingle(SectorCommitAggregateSent{})
require.Equal(m.t, m.state.State, CommitAggregateWait)

m.planSingle(SectorProving{})
require.Equal(m.t, m.state.State, FinalizeSector)
Expand Down
26 changes: 20 additions & 6 deletions storage/pipeline/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,16 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e
for _, piece := range sector.Pieces {
used += piece.Piece().Size.Unpadded()

if !piece.HasDealInfo() {
continue
}

endEpoch, err := piece.EndEpoch()
if err != nil {
return xerrors.Errorf("piece.EndEpoch: %w", err)
}

if piece.HasDealInfo() && endEpoch > lastDealEnd {
if endEpoch > lastDealEnd {
lastDealEnd = endEpoch
}
}
Expand Down Expand Up @@ -953,20 +957,30 @@ func (m *Sealing) SectorsStatus(ctx context.Context, sid abi.SectorNumber, showO
return api.SectorInfo{}, err
}

nv, err := m.Api.StateNetworkVersion(ctx, types.EmptyTSK)
if err != nil {
return api.SectorInfo{}, xerrors.Errorf("getting network version: %w", err)
}

deals := make([]abi.DealID, len(info.Pieces))
pieces := make([]api.SectorPiece, len(info.Pieces))
for i, piece := range info.Pieces {
// todo make this work with DDO deals in some reasonable way

pieces[i].Piece = piece.Piece()
if !piece.HasDealInfo() || piece.Impl().PublishCid == nil {

if !piece.HasDealInfo() {
continue
}

pdi := piece.Impl()
if pdi.Valid(nv) != nil {
continue
}

pdi := piece.DealInfo().Impl() // copy
pieces[i].DealInfo = &pdi

deals[i] = piece.DealInfo().Impl().DealID
if pdi.PublishCid != nil {
deals[i] = pdi.DealID
}
}

log := make([]api.SectorLog, len(info.Log))
Expand Down
2 changes: 1 addition & 1 deletion storage/pipeline/sector_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ const (
CommitFinalizeFailed SectorState = "CommitFinalizeFailed"

// single commit
SubmitCommit SectorState = "SubmitCommit" // send commit message to the chain
SubmitCommit SectorState = "SubmitCommit" // send commit message to the chain (deprecated)
CommitWait SectorState = "CommitWait" // wait for the commit message to land on chain

SubmitCommitAggregate SectorState = "SubmitCommitAggregate"
Expand Down
Loading

0 comments on commit f929ae1

Please sign in to comment.