Skip to content

Commit

Permalink
Complete initial plumbing for executing functions via trigger subjects (
Browse files Browse the repository at this point in the history
#36)

* Pass trigger subject to v8 function
* Resolve workload type in devrun
* Rename hello-nex v8 example to echofunction
* Use flags instead of args for workload type and trigger subjects
* Remove total bytes param validation from v8 provider
  • Loading branch information
kthomas authored Jan 8, 2024
1 parent 992954a commit 4f3195e
Show file tree
Hide file tree
Showing 10 changed files with 59 additions and 19 deletions.
17 changes: 17 additions & 0 deletions agent-api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,24 @@ package agentapi
import (
"errors"
"io"
"strings"
"time"

"github.com/nats-io/nats.go"
)

// NexExecutionProviderELF Executable Linkable Format execution provider
const NexExecutionProviderELF = "elf"

// NexExecutionProviderV8 V8 execution provider
const NexExecutionProviderV8 = "v8"

// NexExecutionProviderOCI OCI execution provider
const NexExecutionProviderOCI = "oci"

// NexExecutionProviderWasm Wasm execution provider
const NexExecutionProviderWasm = "wasm"

// WorkloadCacheBucket is an internal, non-public bucket for sharing files between host and agent
const WorkloadCacheBucket = "NEXCACHE"

Expand Down Expand Up @@ -73,6 +86,10 @@ func (w *WorkRequest) Validate() bool {

if w.WorkloadType == nil {
w.Errors = append(w.Errors, errors.New("workload type is required"))
} else if (strings.EqualFold(*w.WorkloadType, NexExecutionProviderV8) ||
strings.EqualFold(*w.WorkloadType, NexExecutionProviderWasm)) &&
len(w.TriggerSubjects) == 0 {
w.Errors = append(w.Errors, errors.New("at least one trigger subject is required for this workload type"))
}

return len(w.Errors) == 0
Expand Down
File renamed without changes.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "hello-nex",
"name": "echofunction",
"version": "1.0.0",
"description": "nex example executable javascript project",
"author": "Synadia Communications, Inc.",
Expand Down
6 changes: 6 additions & 0 deletions examples/v8/echofunction/src/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export default () => {
return (subject, payload) => {
console.log(subject)
return payload
}
}
3 changes: 0 additions & 3 deletions examples/v8/hello-nex/src/index.js

This file was deleted.

16 changes: 11 additions & 5 deletions nex-agent/providers/lib/v8.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,19 @@ func (v *V8) Execute(subject string, payload []byte) ([]byte, error) {
return
}

arg, err := v8.NewValue(v.ctx.Isolate(), payload)
argv1, err := v8.NewValue(v.ctx.Isolate(), subject)
if err != nil {
errs <- err
return
}

val, err = fn.Call(v.ctx.Global(), arg)
argv2, err := v8.NewValue(v.ctx.Isolate(), payload)
if err != nil {
errs <- err
return
}

val, err = fn.Call(v.ctx.Global(), argv1, argv2)
if err != nil {
errs <- err
return
Expand Down Expand Up @@ -173,9 +179,9 @@ func InitNexExecutionProviderV8(params *agentapi.ExecutionProviderParams) (*V8,
return nil, errors.New("V8 execution provider requires a temporary filename parameter")
}

if params.TotalBytes == nil {
return nil, errors.New("V8 execution provider requires a VM id parameter")
}
// if params.TotalBytes == nil {
// return nil, errors.New("V8 execution provider requires a total bytes parameter")
// }

return &V8{
environment: params.Environment,
Expand Down
5 changes: 3 additions & 2 deletions nex-cli/cmd/nex/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,15 @@ func main() {
run.Flag("issuer", "Path to a seed key to sign the workload JWT as the issuer").Required().ExistingFileVar(&cli.RunOpts.ClaimsIssuerFile)
run.Arg("env", "Environment variables to pass to workload").StringMapVar(&cli.RunOpts.Env)
run.Flag("name", "Name of the workload. Must be alphabetic (lowercase)").Required().StringVar(&cli.RunOpts.Name)
run.Arg("type", "Type of workload, e.g., \"elf\", \"v8\", \"oci\", \"wasm\"").Required().Default("elf").StringVar(&cli.RunOpts.WorkloadType)
run.Flag("type", "Type of workload, e.g., \"elf\", \"v8\", \"oci\", \"wasm\"").StringVar(&cli.RunOpts.WorkloadType)
run.Flag("description", "Description of the workload").StringVar(&cli.RunOpts.Description)
run.Arg("trigger_subjects", "Trigger subjects to register for subsequent workload execution, if supported by the workload type").StringsVar(&cli.RunOpts.TriggerSubjects)
run.Flag("trigger_subjects", "Trigger subjects to register for subsequent workload execution, if supported by the workload type").StringsVar(&cli.RunOpts.TriggerSubjects)
run.Action(cli.RunWorkload)

yeet := ncli.Command("devrun", "Run a workload locating reasonable defaults (developer mode)").Alias("yeet")
yeet.Arg("file", "File to run").Required().ExistingFileVar(&cli.DevRunOpts.Filename)
yeet.Arg("env", "Environment variables to pass to workload").StringMapVar(&cli.RunOpts.Env)
yeet.Flag("trigger_subjects", "Trigger subjects to register for subsequent workload execution, if supported by the workload type").StringsVar(&cli.RunOpts.TriggerSubjects)
yeet.Flag("stop", "Indicates whether to stop pre-existing workloads during launch. Disable with caution").Default("true").BoolVar(&cli.DevRunOpts.AutoStop)
yeet.Action(cli.RunDevWorkload)

Expand Down
24 changes: 19 additions & 5 deletions nex-cli/devrunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strings"
"time"

agentapi "github.com/ConnectEverything/nex/agent-api"
controlapi "github.com/ConnectEverything/nex/control-api"
"github.com/choria-io/fisk"
"github.com/nats-io/nats.go"
Expand All @@ -22,8 +23,11 @@ var (
)

const (
defaultFileMode = os.FileMode(int(0770)) // owner and group r/w/x
objectStoreName = "NEXCLIFILES"
defaultFileMode = os.FileMode(int(0770)) // owner and group r/w/x
defaultWorkloadType = agentapi.NexExecutionProviderELF
fileExtensionJS = "js"
fileExtensionWasm = "wasm"
objectStoreName = "NEXCLIFILES"
)

func init() {
Expand Down Expand Up @@ -105,7 +109,7 @@ func RunDevWorkload(ctx *fisk.ParseContext) error {
controlapi.SenderXKey(publisherXKey),
controlapi.TargetNode(target.NodeId),
controlapi.TargetPublicXKey(targetPublicXkey),
controlapi.TriggerSubjects(nil),
controlapi.TriggerSubjects(RunOpts.TriggerSubjects),
controlapi.WorkloadName(workloadName),
controlapi.WorkloadType(workloadType),
controlapi.Checksum("abc12345TODOmakethisreal"),
Expand Down Expand Up @@ -145,14 +149,24 @@ func uploadWorkload(nc *nats.Conn, filename string) (string, string, string, err
return "", "", "", err
}
key := filepath.Base(filename)
key = strings.ReplaceAll(key, ".", "_")
key = strings.ReplaceAll(key, ".", "")

_, err = bucket.PutBytes(key, bytes)
if err != nil {
return "", "", "", err
}

return fmt.Sprintf("nats://%s/%s", objectStoreName, key), key, "elf", nil
var workloadType string
switch strings.Replace(filepath.Ext(DevRunOpts.Filename), ".", "", 1) {
case fileExtensionJS:
workloadType = agentapi.NexExecutionProviderV8
case fileExtensionWasm:
workloadType = agentapi.NexExecutionProviderWasm
default:
workloadType = defaultWorkloadType
}

return fmt.Sprintf("nats://%s/%s", objectStoreName, key), key, workloadType, nil
}

func readOrGenerateIssuer() (nkeys.KeyPair, error) {
Expand Down
1 change: 0 additions & 1 deletion nex-node/machine_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
const (
EventSubjectPrefix = "$NEX.events"
LogSubjectPrefix = "$NEX.logs"
TriggerSubjectPrefix = "$NEX.triggers"
WorkloadCacheBucketName = "NEXCACHE"

defaultHandshakeTimeoutMillis = 5000
Expand Down

0 comments on commit 4f3195e

Please sign in to comment.