Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add vm execution lanes #6011

Merged
merged 2 commits into from
Jun 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/consensus/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/filecoin-project/venus/pkg/config"
"github.com/filecoin-project/venus/pkg/constants"
"github.com/filecoin-project/venus/pkg/fvm"
"github.com/filecoin-project/venus/pkg/vm/vmcontext"
"github.com/filecoin-project/venus/venus-shared/actors/builtin/reward"

"github.com/filecoin-project/go-address"
Expand Down Expand Up @@ -112,6 +113,7 @@ func (p *DefaultProcessor) ApplyBlocks(ctx context.Context,
Tracing: vmOpts.Tracing,
ActorDebugging: vmOpts.ActorDebugging,
ReturnEvents: vmOpts.ReturnEvents,
ExecutionLane: vmcontext.ExecutionLanePriority,
}

return fvm.NewVM(ctx, vmOpt)
Expand Down
17 changes: 17 additions & 0 deletions pkg/fvm/fvm.go
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,23 @@ func (r *xRedirect) MarshalCBOR(w io.Writer) error {
var useFvmDebug = os.Getenv("VENUS_FVM_DEVELOPER_DEBUG") == "1"

func NewVM(ctx context.Context, opts vm.VmOption) (vm.Interface, error) {

switch opts.ExecutionLane {
case vmcontext.ExecutionLaneDefault, vmcontext.ExecutionLanePriority:
default:
return nil, fmt.Errorf("invalid execution lane: %d", opts.ExecutionLane)
}

vmi, err := makeVM(ctx, opts)
if err != nil {
return nil, err
}

return vmcontext.NewVMExecutor(vmi, opts.ExecutionLane), nil
}

func makeVM(ctx context.Context, opts vm.VmOption) (vm.Interface, error) {

if opts.NetworkVersion >= network.Version16 {
if useFvmDebug {
return NewDualExecutionFVM(ctx, &opts)
Expand Down
163 changes: 163 additions & 0 deletions pkg/vm/vmcontext/execution.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package vmcontext

import (
"context"
"os"
"strconv"
"sync"

"github.com/ipfs/go-cid"

"github.com/filecoin-project/venus/venus-shared/types"
)

const (
// DefaultAvailableExecutionLanes is the number of available execution lanes; it is the bound of
// concurrent active executions.
// This is the default value in filecoin-ffi
DefaultAvailableExecutionLanes = 4
// DefaultPriorityExecutionLanes is the number of reserved execution lanes for priority computations.
// This is purely userspace, but we believe it is a reasonable default, even with more available
// lanes.
DefaultPriorityExecutionLanes = 2
)

// the execution environment; see below for definition, methods, and initialization
var execution *executionEnv

// implementation of vm executor with simple sanity check preventing use after free.
type vmExecutor struct {
vmi Interface
lane ExecutionLane
}

var _ Interface = (*vmExecutor)(nil)

func NewVMExecutor(vmi Interface, lane ExecutionLane) Interface {
return &vmExecutor{vmi: vmi, lane: lane}
}

func (e *vmExecutor) ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*Ret, error) {
token := execution.getToken(e.lane)
defer token.Done()

return e.vmi.ApplyMessage(ctx, cmsg)
}

func (e *vmExecutor) ApplyImplicitMessage(ctx context.Context, msg types.ChainMsg) (*Ret, error) {
token := execution.getToken(e.lane)
defer token.Done()

return e.vmi.ApplyImplicitMessage(ctx, msg)
}

func (e *vmExecutor) Flush(ctx context.Context) (cid.Cid, error) {
return e.vmi.Flush(ctx)
}

type executionToken struct {
lane ExecutionLane
reserved int
}

func (token *executionToken) Done() {
execution.putToken(token)
}

type executionEnv struct {
mx *sync.Mutex
cond *sync.Cond

// available executors
available int
// reserved executors
reserved int
}

func (e *executionEnv) getToken(lane ExecutionLane) *executionToken {

e.mx.Lock()
defer e.mx.Unlock()

switch lane {
case ExecutionLaneDefault:
for e.available <= e.reserved {
e.cond.Wait()
}

e.available--

return &executionToken{lane: lane, reserved: 0}

case ExecutionLanePriority:
for e.available == 0 {
e.cond.Wait()
}

e.available--

reserving := 0
if e.reserved > 0 {
e.reserved--
reserving = 1
}

return &executionToken{lane: lane, reserved: reserving}

default:
// already checked at interface boundary in NewVM, so this is appropriate
panic("bogus execution lane")
}
}

func (e *executionEnv) putToken(token *executionToken) {
e.mx.Lock()
defer e.mx.Unlock()

e.available++
e.reserved += token.reserved

// Note: Signal is unsound, because a priority token could wake up a non-priority
// goroutnie and lead to deadlock. So Broadcast it must be.
e.cond.Broadcast()

}

func init() {
var err error

available := DefaultAvailableExecutionLanes
if concurrency := os.Getenv("LOTUS_FVM_CONCURRENCY"); concurrency != "" {
available, err = strconv.Atoi(concurrency)
if err != nil {
panic(err)
}
}

priority := DefaultPriorityExecutionLanes
if reserved := os.Getenv("LOTUS_FVM_CONCURRENCY_RESERVED"); reserved != "" {
priority, err = strconv.Atoi(reserved)
if err != nil {
panic(err)
}
}

// some sanity checks
if available < 2 {
panic("insufficient execution concurrency")
}

if available <= priority {
panic("insufficient default execution concurrency")
}

mx := &sync.Mutex{}
cond := sync.NewCond(mx)

execution = &executionEnv{
mx: mx,
cond: cond,
available: available,
reserved: priority,
}
}
11 changes: 11 additions & 0 deletions pkg/vm/vmcontext/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ type (
TipSetGetter func(context.Context, abi.ChainEpoch) (types.TipSetKey, error)
)

type ExecutionLane int

const (
// ExecutionLaneDefault signifies a default, non prioritized execution lane.
ExecutionLaneDefault ExecutionLane = iota
// ExecutionLanePriority signifies a prioritized execution lane with reserved resources.
ExecutionLanePriority
)

type VmOption struct { //nolint
CircSupplyCalculator CircSupplyCalculator
LookbackStateGetter LookbackStateGetter
Expand All @@ -53,6 +62,8 @@ type VmOption struct { //nolint
ActorDebugging bool
// ReturnEvents decodes and returns emitted events.
ReturnEvents bool
// ExecutionLane specifies the execution priority of the created vm
ExecutionLane ExecutionLane
}

type ILookBack interface {
Expand Down