Skip to content

Commit

Permalink
Add Node struct and refactor node up (#104)
Browse files Browse the repository at this point in the history
* Update clean task
* Add node struct and refactor node up
* Tweak otel vm counter
* Add argv support to run command
* Fix linter
* Ensure node exits cleanly
  - Ensure internal NATS server is shutdown
  - Recover from panic in fillPool()
* Cleanup prereq validation during node init
* Move additional otel setup under telemetry.go
* Use camelcase naming convention
* Cleanup error handling in node init()
* Rename deployWorkload -> deployRequest on firecracker VM struct
* Fix linter
* Rewrite node prereq constants as camelcase
* Rename build github action
* Rename control api RunRequest -> DeployRequest
* Cleanup agent for naming consistency
* Only respond to trigger subject function invocation if provider returned > 0 bytes
* Interact with telemetry when enabled at runtime
* Remove FIXME
* Drop log added during development
* Generate keypair when creating a new node instance
* Abort pending handshakes when machine manager is shutting down
* Commit go module artifacts
* Add node up specs
* Add node proxy for exposing node-internal types under test
* Use node proxy in node specs
* Make node up spec suite pass
* Make code more readable in machine manager
* Fix linter
* Commit go module artifacts
  - Specified a few modules which were previously being reported as
ambiguous
* Drop -race flag from spec portion of test task
* Remove unused node opts in favor of node configuration
* Upgrade ginkgo version
* Fix rootfs build in test suite
* Appease github actions with longer timeouts
* Build agent before running test suite in github actions
* Revert "Appease github actions with longer timeouts"
This reverts commit 116e147.
* Ack workload deployment after provider success
* Reformat constants in v8 execution provider
* Use errors.Join in agent type validation methods
* Refactor telemetry to initialize otel metrics conditionally
* Remove conditional telemetry access
* Use errors.Join correctly
* Install latest ginkgo in github actions
* Fix bug in telemetry service
* Add --output-interceptor-mode=none to ginkgo run in github actions
  • Loading branch information
kthomas committed Feb 2, 2024
1 parent c304ecf commit 59f22ed
Show file tree
Hide file tree
Showing 41 changed files with 1,595 additions and 914 deletions.
10 changes: 7 additions & 3 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go

name: Test | Build
name: Build

on:
push:
Expand Down Expand Up @@ -54,7 +54,11 @@ jobs:
name: Install ginkgo
shell: bash --noprofile --norc -x -eo pipefail {0}
run: |
go install github.com/onsi/ginkgo/v2/ginkgo
go install github.com/onsi/ginkgo/v2/ginkgo@latest
-
name: Build agent
working-directory: ./agent/cmd/nex-agent
run: go build -tags netgo -ldflags '-extldflags "-static"'
-
name: Build ui
working-directory: ./ui/web
Expand All @@ -65,7 +69,7 @@ jobs:
-
name: Run spec suite
working-directory: .
run: sudo -E ~/go/bin/ginkgo run -r --randomize-all --vv -race --trace --keep-going ./spec
run: sudo -E ~/go/bin/ginkgo run -r --randomize-all --vv --trace --keep-going --output-interceptor-mode=none ./spec
-
name: Run test suite
working-directory: .
Expand Down
11 changes: 9 additions & 2 deletions Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,19 @@ tasks:
clean:
cmds:
- rm -f nex/nex
- rm -f rootfs.ext4.gz
- sudo rm -rf /opt/cni/bin/*
- sudo rm -rf /var/lib/cni/*
- sudo rm -rf /etc/cni/conf.d/*
- sudo rm -f /tmp/rootfs-*
- sudo rm -f /tmp/.firecracker.*
- sudo rm -rf /tmp/pnats
- sudo rm -f /tmp/*.js
- sudo rm -rf /tmp/*-spec
- sudo rm -rf /tmp/*-spec-nex-conf.json
- sudo rm -rf /tmp/*-spec-nex-wd
- sudo rm -rf /tmp/rootfs-*.ext4
- sudo rm -rf /tmp/*-rootfs.ext4

nex:
dir: nex
Expand All @@ -41,9 +48,9 @@ tasks:
- go build -tags netgo -ldflags '-extldflags "-static"'

test:
deps: [clean]
deps: [clean, agent]
cmds:
- sudo $GOPATH/bin/ginkgo run -r --randomize-all --vv -race --trace --keep-going ./spec #--cover --coverprofile=.coverage-report.out
- sudo $GOPATH/bin/ginkgo run -r --randomize-all --vv --trace --keep-going ./spec #--cover --coverprofile=.coverage-report.out
- go test -v -race ./test

ui:
Expand Down
33 changes: 17 additions & 16 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,19 +131,9 @@ func (a *Agent) RequestHandshake() error {
return nil
}

func (a *Agent) handleUndeploy(m *nats.Msg) {
err := a.provider.UnDeploy()
if err != nil {
// don't return an error here so worst-case scenario is an ungraceful shutdown,
// not a failure
a.LogError(fmt.Sprintf("Failed to undeploy workload: %s", err))
}
_ = m.Respond([]byte{})
}

// Pull a RunRequest off the wire, get the payload from the shared
// bucket, write it to temp, initialize the execution provider per
// the request, and then validate and deploy a workload
// Pull a deploy request off the wire, get the payload from the shared
// bucket, write it to tmp, initialize the execution provider per the
// request, and then validate and deploy a workload
func (a *Agent) handleDeploy(m *nats.Msg) {
var request agentapi.DeployRequest
err := json.Unmarshal(m.Data, &request)
Expand Down Expand Up @@ -188,14 +178,25 @@ func (a *Agent) handleDeploy(m *nats.Msg) {
return
}

_ = a.workAck(m, true, "Workload accepted")

err = a.provider.Deploy()
if err != nil {
a.LogError(fmt.Sprintf("Failed to deploy workload: %s", err))
} else {
_ = a.workAck(m, true, "Workload deployed")
}
}

func (a *Agent) handleUndeploy(m *nats.Msg) {
err := a.provider.Undeploy()
if err != nil {
// don't return an error here so worst-case scenario is an ungraceful shutdown,
// not a failure
a.LogError(fmt.Sprintf("Failed to undeploy workload: %s", err))
}

_ = m.Respond([]byte{})
}

// cacheExecutableArtifact uses the underlying agent configuration to fetch
// the executable workload artifact from the cache bucket, write it to a
// temporary file and make it executable; this method returns the full
Expand Down Expand Up @@ -257,7 +258,7 @@ func (a *Agent) newExecutionProviderParams(req *agentapi.DeployRequest, tmpFile
return

case <-params.Run:
a.PublishWorkloadStarted(params.VmID, *params.WorkloadName, params.TotalBytes)
a.PublishWorkloadDeployed(params.VmID, *params.WorkloadName, params.TotalBytes)
sleepMillis = workloadExecutionSleepTimeoutMillis

case exit := <-params.Exit:
Expand Down
8 changes: 4 additions & 4 deletions agent/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ func (a *Agent) submitLog(msg string, lvl agentapi.LogLevel) {
}

// FIXME-- revisit error handling
func (a *Agent) PublishWorkloadStarted(vmID, workloadName string, totalBytes int64) {
func (a *Agent) PublishWorkloadDeployed(vmID, workloadName string, totalBytes int64) {
select {
case a.agentLogs <- &agentapi.LogEntry{
Source: NexEventSourceNexAgent,
Level: agentapi.LogLevelInfo,
Text: fmt.Sprintf("Workload %s started", workloadName),
Text: fmt.Sprintf("Workload %s deployed", workloadName),
}: // noop
default:
// noop
Expand All @@ -64,9 +64,9 @@ func (a *Agent) PublishWorkloadExited(vmID, workloadName, message string, err bo
}

// FIXME-- this hack is here to get things working... refactor me
txt := fmt.Sprintf("Workload %s stopped", workloadName)
txt := fmt.Sprintf("Workload %s exited", workloadName)
if code == -1 {
txt = fmt.Sprintf("Workload %s failed to start", workloadName)
txt = fmt.Sprintf("Workload %s failed to deploy", workloadName)
}

select {
Expand Down
6 changes: 3 additions & 3 deletions agent/providers/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ type ExecutionProvider interface {
// Deploy a service (e.g., "elf" and "oci" types) or executable function (e.g., "v8" and "wasm" types)
Deploy() error

// Undeploy a workload, giving it a chance to gracefully clean up after itself (if applicable)
UnDeploy() error

// Execute a deployed function, if supported by the execution provider implementation (e.g., "v8" and "wasm" types)
Execute(subject string, payload []byte) ([]byte, error)

// Undeploy a workload, giving it a chance to gracefully clean up after itself (if applicable)
Undeploy() error

// Validate the executable artifact, e.g., specific characteristics of a
// statically-linked binary or raw source code, depending on provider implementation
Validate() error
Expand Down
24 changes: 14 additions & 10 deletions agent/providers/lib/elf.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

// ELF execution provider implementation
type ELF struct {
argv []string
environment map[string]string
name string
tmpFilename string
Expand All @@ -31,18 +32,9 @@ type ELF struct {
stdout io.Writer
}

func (e *ELF) UnDeploy() error {
err := e.cmd.Process.Signal(os.Kill)
if err != nil {
e.fail <- true
return err
}
return nil
}

// Deploy the ELF binary
func (e *ELF) Deploy() error {
cmd := exec.Command(e.tmpFilename)
cmd := exec.Command(e.tmpFilename, e.argv...)
cmd.Stdout = e.stdout
cmd.Stderr = e.stderr

Expand Down Expand Up @@ -91,6 +83,17 @@ func (e *ELF) Execute(subject string, payload []byte) ([]byte, error) {
return nil, errors.New("ELF execution provider does not support execution via trigger subjects")
}

// Undeploy the ELF binary
func (e *ELF) Undeploy() error {
err := e.cmd.Process.Signal(os.Kill)
if err != nil {
e.fail <- true
return err
}

return nil
}

// Validate the underlying artifact to be a 64-bit linux native ELF
// binary that is statically-linked
func (e *ELF) Validate() error {
Expand All @@ -112,6 +115,7 @@ func InitNexExecutionProviderELF(params *agentapi.ExecutionProviderParams) (*ELF
}

return &ELF{
argv: params.Argv,
environment: params.Environment,
name: *params.WorkloadName,
tmpFilename: *params.TmpFilename,
Expand Down
8 changes: 4 additions & 4 deletions agent/providers/lib/oci.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ func (o *OCI) Deploy() error {
return errors.New("oci execution provider not yet implemented")
}

func (o *OCI) UnDeploy() error {
return errors.New("oci execution provider not yet implemented")
}

func (o *OCI) Execute(subject string, payload []byte) ([]byte, error) {
return nil, errors.New("oci execution provider does not support execution via trigger subjects")
}

func (o *OCI) Undeploy() error {
return errors.New("oci execution provider not yet implemented")
}

func (o *OCI) Validate() error {
return errors.New("oci execution provider not yet implemented")
}
Expand Down
53 changes: 23 additions & 30 deletions agent/providers/lib/v8.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@ import (
v8 "rogchap.com/v8go"
)

const v8MaxFileSizeBytes = int64(12288) // arbitrarily ~12K, for now
const (
nexTriggerSubject = "x-nex-trigger-subject"
nexRuntimeNs = "x-nex-runtime-ns"

v8MaxFileSizeBytes = int64(12288) // arbitrarily ~12K, for now
)

// V8 execution provider implementation
type V8 struct {
Expand All @@ -35,11 +40,6 @@ type V8 struct {
ubs *v8.UnboundScript
}

func (v *V8) UnDeploy() error {
// We shouldn't have to do anything here since the script "owns" no resources
return nil
}

// Deploy expects a `Validate` to have succeeded and `ubs` to be non-nil
func (v *V8) Deploy() error {
if v.ubs == nil {
Expand All @@ -49,27 +49,22 @@ func (v *V8) Deploy() error {
subject := fmt.Sprintf("agentint.%s.trigger", v.vmID)
_, err := v.nc.Subscribe(subject, func(msg *nats.Msg) {
startTime := time.Now()
val, err := v.Execute(msg.Header.Get("x-nex-trigger-subject"), msg.Data)
val, err := v.Execute(msg.Header.Get(nexTriggerSubject), msg.Data)
if err != nil {
// TODO-- propagate this error to agent logs
return
}
runLength := time.Since(startTime).Nanoseconds()
runLength_s := strconv.FormatInt(runLength, 10)

if len(val) > 0 {
err := msg.RespondMsg(&nats.Msg{
Data: val,
Header: nats.Header{
"x-nex-run-nano-sec": []string{
runLength_s,
},
},
})
if err != nil {
// TODO-- propagate this error to agent logs
return
}

runtimeNanos := time.Since(startTime).Nanoseconds()
err = msg.RespondMsg(&nats.Msg{
Data: val,
Header: nats.Header{
nexRuntimeNs: []string{strconv.FormatInt(runtimeNanos, 10)},
},
})
if err != nil {
// TODO-- propagate this error to agent logs
return
}
})
if err != nil {
Expand All @@ -88,13 +83,6 @@ func (v *V8) Execute(subject string, payload []byte) ([]byte, error) {
return nil, fmt.Errorf("invalid state for execution; no compiled code available for vm: %s", v.name)
}

// TODO-- implement the following
// cmd.Env = make([]string, len(e.environment))
// for k, v := range e.environment {
// item := fmt.Sprintf("%s=%s", strings.ToUpper(k), v)
// cmd.Env = append(cmd.Env, item)
// }

var err error

vals := make(chan *v8.Value, 1)
Expand Down Expand Up @@ -153,6 +141,11 @@ func (v *V8) Execute(subject string, payload []byte) ([]byte, error) {
return nil, nil
}

func (v *V8) Undeploy() error {
// We shouldn't have to do anything here since the script "owns" no resources
return nil
}

// Validate has the side effect of compiling the executable javascript source
// code and setting `ubs` on the underlying V8 execution provider instance.
func (v *V8) Validate() error {
Expand Down
10 changes: 5 additions & 5 deletions agent/providers/lib/wasm.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@ type Wasm struct {
nc *nats.Conn // agent NATS connection
}

func (e *Wasm) UnDeploy() error {
// We shouldn't have to do anything here since the wasm "owns" no resources
return nil
}

func (e *Wasm) Deploy() error {
subject := fmt.Sprintf("agentint.%s.trigger", e.vmID)
_, err := e.nc.Subscribe(subject, func(msg *nats.Msg) {
Expand Down Expand Up @@ -85,6 +80,11 @@ func (e *Wasm) Execute(subject string, payload []byte) ([]byte, error) {
return nil, errors.New("unknown")
}

func (e *Wasm) Undeploy() error {
// We shouldn't have to do anything here since the wasm "owns" no resources
return nil
}

func (e *Wasm) Validate() error {
ctx := context.Background()
e.runtime = wazero.NewRuntime(ctx)
Expand Down
6 changes: 3 additions & 3 deletions examples/nodeconfigs/limited_issuers.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"kernel_file": "/path/to/vmlinux-5.10",
"rootfs_file": "/path/to/rootfs.ext4",
"kernel_filepath": "/path/to/vmlinux-5.10",
"rootfs_filepath": "/path/to/rootfs.ext4",
"machine_pool_size": 1,
"cni": {
"network_name": "fcnet",
Expand All @@ -14,4 +14,4 @@
"AARBEQDCEKB7NYZLZRXAOAF6QGYGCN636VTN45USLIIW4QLG7Z2MBGH4",
"ABAGWNQ5V5H6LVODYATY5Q27OBQASRLSUG23FYBDWUR5BFI5UIMQ5GOQ"
]
}
}
6 changes: 3 additions & 3 deletions examples/nodeconfigs/rate_limited.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"kernel_file": "/path/to/vmlinux-5.10",
"rootfs_file": "/path/to/rootfs.ext4",
"kernel_filepath": "/path/to/vmlinux-5.10",
"rootfs_filepath": "/path/to/rootfs.ext4",
"machine_pool_size": 1,
"cni": {
"network_name": "fcnet",
Expand All @@ -22,4 +22,4 @@
"size": 100
}
}
}
}
Loading

0 comments on commit 59f22ed

Please sign in to comment.