Skip to content

Commit

Permalink
Refactor execution provider to support trigger subjects
Browse files Browse the repository at this point in the history
This commit updates the ExecutionProvider interface to support "deploy"
(applicable to all workload types) and "execute" (applicable only to v8
and wasm types, for now).
  • Loading branch information
kthomas committed Jan 7, 2024
1 parent 56bac52 commit 572d14d
Show file tree
Hide file tree
Showing 9 changed files with 147 additions and 29 deletions.
18 changes: 13 additions & 5 deletions agent-api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"errors"
"io"
"time"

"github.com/nats-io/nats.go"
)

// WorkloadCacheBucket is an internal, non-public bucket for sharing files between host and agent
Expand All @@ -15,6 +17,7 @@ const DefaultRunloopSleepTimeoutMillis = 25
// ExecutionProviderParams parameters for initializing a specific execution provider
type ExecutionProviderParams struct {
WorkRequest
TriggerSubjects []string `json:"trigger_subjects"`

// Fail channel receives bool upon command failing to start
Fail chan bool `json:"-"`
Expand All @@ -30,14 +33,19 @@ type ExecutionProviderParams struct {

TmpFilename *string `json:"-"`
VmID string `json:"-"`

// NATS connection which be injected into the execution provider
NATSConn *nats.Conn `json:"-"`
}

// FIXME? WorkRequest -> DeployRequest?
type WorkRequest struct {
Environment map[string]string `json:"environment"`
Hash *string `json:"hash,omitempty"`
TotalBytes *int32 `json:"total_bytes,omitempty"`
WorkloadName *string `json:"workload_name,omitempty"`
WorkloadType *string `json:"workload_type,omitempty"`
Environment map[string]string `json:"environment"`
Hash *string `json:"hash,omitempty"`
TotalBytes *int32 `json:"total_bytes,omitempty"`
TriggerSubjects []string `json:"trigger_subjects"`
WorkloadName *string `json:"workload_name,omitempty"`
WorkloadType *string `json:"workload_type,omitempty"`

Stderr io.Writer `json:"-"`
Stdout io.Writer `json:"-"`
Expand Down
8 changes: 8 additions & 0 deletions control-api/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"net/url"
"regexp"
"strings"

"github.com/nats-io/jwt/v2"
"github.com/nats-io/nkeys"
Expand Down Expand Up @@ -75,6 +76,13 @@ func NewRunRequest(opts ...RequestOption) (*RunRequest, error) {
return req, nil
}

// Returns true if the run request supports trigger subjects
func (request *RunRequest) SupportsTriggerSubjects() bool {
return (strings.EqualFold(*request.WorkloadType, "v8") ||
strings.EqualFold(*request.WorkloadType, "wasm")) &&
len(request.TriggerSubjects) > 0
}

// This will validate a request's workload JWT, decrypt the request environment. It will not
// perform a comparison of the hash found in the claims with a recipient's expected hash
func (request *RunRequest) Validate(myKey nkeys.KeyPair) (*jwt.GenericClaims, error) {
Expand Down
5 changes: 4 additions & 1 deletion nex-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (a *Agent) handleWorkDispatched(m *nats.Msg) {

_ = a.workAck(m, true, "Workload accepted")

err = provider.Execute()
err = provider.Deploy()
if err != nil {
a.LogError(fmt.Sprintf("Failed to execute workload: %s", err))
}
Expand Down Expand Up @@ -214,6 +214,9 @@ func (a *Agent) newExecutionProviderParams(req *agentapi.WorkRequest, tmpFile st
Fail: make(chan bool),
Run: make(chan bool),
Exit: make(chan int),

NATSConn: a.nc,
TriggerSubjects: req.TriggerSubjects,
}

go func() {
Expand Down
9 changes: 6 additions & 3 deletions nex-agent/providers/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@ const NexExecutionProviderOCI = "oci"
const NexExecutionProviderWasm = "wasm"

// ExecutionProvider implementations provide support for a specific
// execution environment "persona" -- e.g., statically-linked ELF
// execution environment pattern -- e.g., statically-linked ELF
// binaries, serverless JavaScript functions, OCI images, Wasm, etc.
type ExecutionProvider interface {
// Execute is the equivalent to a language-specific main() entrypoint
Execute() error
// Deploy a service (e.g., "elf" and "oci" types) or executable function (e.g., "v8" and "wasm" types)
Deploy() error

// Execute a deployed function, if supported by the execution provider implementation (e.g., "v8" and "wasm" types)
Execute(subject string, payload []byte) ([]byte, error)

// Validate the executable artifact, e.g., specific characteristics of a
// statically-linked binary or raw source code, depending on provider implementation
Expand Down
8 changes: 6 additions & 2 deletions nex-agent/providers/lib/elf.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ type ELF struct {
stdout io.Writer
}

// Execute the ELF binary
func (e *ELF) Execute() error {
// Deploy the ELF binary
func (e *ELF) Deploy() error {
cmd := exec.Command(e.tmpFilename)
cmd.Stdout = e.stdout
cmd.Stderr = e.stderr
Expand Down Expand Up @@ -73,6 +73,10 @@ func (e *ELF) Execute() error {
return nil
}

func (e *ELF) Execute(subject string, payload []byte) ([]byte, error) {
return nil, errors.New("ELF execution provider does not support execution via trigger subjects")
}

// Validate the underlying artifact to be a 64-bit linux native ELF
// binary that is statically-linked
func (e *ELF) Validate() error {
Expand Down
6 changes: 5 additions & 1 deletion nex-agent/providers/lib/oci.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@ import (
type OCI struct {
}

func (o *OCI) Execute() error {
func (o *OCI) Deploy() error {
return errors.New("oci execution provider not yet implemented")
}

func (o *OCI) Execute(subject string, payload []byte) ([]byte, error) {
return nil, errors.New("oci execution provider does not support execution via trigger subjects")
}

func (o *OCI) Validate() error {
return errors.New("oci execution provider not yet implemented")
}
Expand Down
59 changes: 48 additions & 11 deletions nex-agent/providers/lib/v8.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

agentapi "github.com/ConnectEverything/nex/agent-api"
"github.com/nats-io/nats.go"
v8 "rogchap.com/v8go"
)

Expand All @@ -27,16 +28,45 @@ type V8 struct {
// stderr io.Writer
// stdout io.Writer

nc *nats.Conn // agent NATS connection

ctx *v8.Context
ubs *v8.UnboundScript
}

// Execute expects a `Validate` to have succeeded and `ubs` to be non-nil
func (v *V8) Execute() error {
// Deploy expects a `Validate` to have succeeded and `ubs` to be non-nil
func (v *V8) Deploy() error {
if v.ubs == nil {
return fmt.Errorf("invalid state for execution; no compiled code available for vm: %s", v.name)
}

subject := fmt.Sprintf("agentint.%s.trigger", v.vmID)
_, err := v.nc.Subscribe(subject, func(msg *nats.Msg) {
val, err := v.Execute(subject, msg.Data)
if err != nil {
// TODO-- propagate this error to agent logs
return
}

if len(val) > 0 {
_ = msg.Respond(val)
}
})
if err != nil {
return fmt.Errorf("failed to subscribe to trigger: %s", err)
}

return nil
}

// Trigger execution of the deployed function; expects a `Validate` to have succeeded and `ubs` to be non-nil.
// The executed function can optionally return a value, in which case it will be deemed a reply and returned
// to the caller. In the case of a nil or empty value returned by the function, no reply will be sent.
func (v *V8) Execute(subject string, payload []byte) ([]byte, error) {
if v.ubs == nil {
return nil, fmt.Errorf("invalid state for execution; no compiled code available for vm: %s", v.name)
}

// TODO-- implement the following
// cmd.Env = make([]string, len(e.environment))
// for k, v := range e.environment {
Expand All @@ -46,32 +76,38 @@ func (v *V8) Execute() error {

var err error

// vals := make(chan *v8.Value, 1)
vals := make(chan *v8.Value, 1)
errs := make(chan error, 1)

go func() {
_, err = v.ubs.Run(v.ctx)

val, err := v.ubs.Run(v.ctx)
if err != nil {
errs <- err
return
}

vals <- val
}()

select {
// case <-vals:
// we don't care about any returned values... executed scripts should return nothing...
case <-errs:
// javascript error
case val := <-vals:
retval, err := val.MarshalJSON()
if err != nil {
return nil, err
}
return retval, nil
case err := <-errs:
return nil, err
case <-time.After(time.Millisecond * agentapi.DefaultRunloopSleepTimeoutMillis):
if err != nil {
// TODO-- check for v8.JSError as this type has Message, Location and StackTrace we can log...
return fmt.Errorf("failed to invoke default export: %s", err)
return nil, fmt.Errorf("failed to invoke default export: %s", err)
}

v.run <- true
}

return nil
return nil, nil
}

// Validate has the side effect of compiling the executable javascript source
Expand Down Expand Up @@ -137,6 +173,7 @@ func InitNexExecutionProviderV8(params *agentapi.ExecutionProviderParams) (*V8,
run: params.Run,
exit: params.Exit,

nc: params.NATSConn,
ctx: v8.NewContext(),
}, nil
}
6 changes: 5 additions & 1 deletion nex-agent/providers/lib/wasm.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@ type Wasm struct {
wasmFile []byte
}

func (e *Wasm) Execute() error {
func (e *Wasm) Deploy() error {
return errors.New("wasm execution provider not yet implemented")
}

func (e *Wasm) Execute(subject string, payload []byte) ([]byte, error) {
return nil, errors.New("wasm execution provider does not support trigger execution... yet ;)")
}

func (e *Wasm) Validate() error {
return errors.New("wasm execution provider not yet implemented")
}
Expand Down
57 changes: 52 additions & 5 deletions nex-node/machine_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,16 @@ func (m *MachineManager) Start() error {
return nil
}

// TODO-- refactor Dispatch -> Deploy; RunRequest -> DeployRequest
func (m *MachineManager) DispatchWork(vm *runningFirecracker, workloadName, namespace string, request controlapi.RunRequest) error {
// TODO: make the bytes and hash/digest available to the agent
req := agentapi.WorkRequest{
WorkloadName: &workloadName,
Hash: nil, // FIXME
TotalBytes: nil, // FIXME
WorkloadType: request.WorkloadType, // FIXME-- audit all types for string -> *string, and validate...
Environment: request.WorkloadEnvironment,
WorkloadName: &workloadName,
Hash: nil, // FIXME
TotalBytes: nil, // FIXME
TriggerSubjects: request.TriggerSubjects,
WorkloadType: request.WorkloadType, // FIXME-- audit all types for string -> *string, and validate...
Environment: request.WorkloadEnvironment,
}
bytes, _ := json.Marshal(req)

Expand All @@ -145,6 +147,51 @@ func (m *MachineManager) DispatchWork(vm *runningFirecracker, workloadName, name

if !workResponse.Accepted {
return fmt.Errorf("workload rejected by agent: %s", *workResponse.Message)
} else if request.SupportsTriggerSubjects() {
for _, tsub := range request.TriggerSubjects {
_, err := m.nc.Subscribe(tsub, func(msg *nats.Msg) {
_tsub := fmt.Sprintf("agentint.%s.trigger", msg.Data)
resp, err := m.ncInternal.Request(_tsub, msg.Data, time.Millisecond*10000) // FIXME-- make timeout configurable
if err != nil {
m.log.WithField("vmid", vm.vmmID).
WithField("trigger_subject", tsub).
WithField("workload_type", *request.WorkloadType).
WithError(err).
Error("Failed to request agent execution via internal trigger subject")
} else if resp != nil {
m.log.WithField("vmid", vm.vmmID).
WithField("trigger_subject", tsub).
WithField("workload_type", *request.WorkloadType).
WithField("payload_size", len(resp.Data)).
Debug("Received response from execution via trigger subject")

err = msg.Respond(msg.Data)
if err != nil {
m.log.WithField("vmid", vm.vmmID).
WithField("trigger_subject", tsub).
WithField("workload_type", *request.WorkloadType).
WithError(err).
Error("Failed to respond to trigger subject subscription request for deployed workload")
}
}
})
if err != nil {
m.log.WithField("vmid", vm.vmmID).
WithField("trigger_subject", tsub).
WithField("workload_type", *request.WorkloadType).
WithError(err).
Error("Failed to create trigger subject subscription for deployed workload")
// TODO-- rollback the otherwise accepted deployment and return the error below...
// return err
}

m.log.WithField("vmid", vm.vmmID).
WithField("trigger_subject", tsub).
WithField("workload_type", *request.WorkloadType).
Info("Created trigger subject subscription for deployed workload")
}

return nil
}

vm.workloadStarted = time.Now().UTC()
Expand Down

0 comments on commit 572d14d

Please sign in to comment.