From f69aefe44166b38c43d6cd996d8ad8695843b85b Mon Sep 17 00:00:00 2001 From: Jordan Rash <15827604+jordan-rash@users.noreply.github.com> Date: Wed, 22 May 2024 14:04:20 -0600 Subject: [PATCH] Reconfigure Default Workload Types (#240) * remove elf and create workload type * add node capabilities based on build --------- Signed-off-by: Jordan Rash <15827604+jordan-rash@users.noreply.github.com> --- .goreleaser.yaml | 6 --- agent/agent.go | 7 +-- agent/providers/api.go | 31 ++++------- agent/providers/lib/native.go | 2 +- control-api/run.go | 19 +++---- control-api/types.go | 33 ++++++------ internal/agent-api/types.go | 54 ++++++++----------- internal/models/cli.go | 2 +- internal/models/config.go | 14 +++-- internal/models/node_capabilities.go | 16 ++++++ internal/models/node_darwin.go | 13 +++++ internal/models/node_linux_amd64.go | 14 +++++ internal/models/node_linux_arm64.go | 13 +++++ internal/models/node_windows.go | 11 ++++ internal/{agent-api => models}/utils.go | 2 +- internal/node/config.go | 2 +- internal/node/controlapi.go | 17 +++--- internal/node/node.go | 4 ++ .../processmanager/firecracker_procman.go | 6 +-- internal/node/workload_mgr.go | 18 +++---- internal/node/workload_mgr_events.go | 2 +- nex/devrunner.go | 3 +- nex/nex.go | 17 +++++- nex/tui/home/data.go | 2 +- spec/node_linux_test.go | 20 +++---- spec/node_windows_test.go | 16 +++--- test/wasm_test.go | 7 +-- 27 files changed, 206 insertions(+), 145 deletions(-) create mode 100644 internal/models/node_capabilities.go create mode 100644 internal/models/node_darwin.go create mode 100644 internal/models/node_linux_amd64.go create mode 100644 internal/models/node_linux_arm64.go create mode 100644 internal/models/node_windows.go rename internal/{agent-api => models}/utils.go (88%) diff --git a/.goreleaser.yaml b/.goreleaser.yaml index 8c0745af..25817375 100644 --- a/.goreleaser.yaml +++ b/.goreleaser.yaml @@ -16,12 +16,6 @@ builds: goarch: - amd64 - arm64 - - arm - goarm: - - 7 - ignore: - - goos: windows - goarch: arm ldflags: - -s -w --X main.VERSION={{.Version}} -X main.COMMIT={{.Commit}} -X main.BUILDDATE={{.Date}} -X github.com/synadia-io/nex/internal/node.VERSION={{.Version}} -X github.com/synadia-io/nex/internal/node.COMMIT={{.Commit}} -X github.com/synadia-io/nex/internal/node.BUILDDATE={{.Date}} - -extldflags "-static" diff --git a/agent/agent.go b/agent/agent.go index 040422cc..bc2fb3f8 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -18,6 +18,7 @@ import ( "github.com/nats-io/nats.go" "github.com/synadia-io/nex/agent/providers" agentapi "github.com/synadia-io/nex/internal/agent-api" + "github.com/synadia-io/nex/internal/models" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/propagation" ) @@ -186,7 +187,7 @@ func (a *Agent) cacheExecutableArtifact(req *agentapi.DeployRequest) (*string, e fileName := fmt.Sprintf("workload-%s", *a.md.VmID) tempFile := path.Join(os.TempDir(), fileName) - if strings.EqualFold(runtime.GOOS, "windows") && strings.EqualFold(*req.WorkloadType, "elf") { + if strings.EqualFold(runtime.GOOS, "windows") && req.WorkloadType == models.NexWorkloadNative { tempFile = fmt.Sprintf("%s.exe", tempFile) } @@ -289,7 +290,7 @@ func (a *Agent) handleDeploy(m *nats.Msg) { a.provider = provider shouldValidate := true - if !a.sandboxed && strings.EqualFold(*request.WorkloadType, agentapi.NexExecutionProviderELF) { + if !a.sandboxed && request.WorkloadType == models.NexWorkloadNative { shouldValidate = false } @@ -469,7 +470,7 @@ func (a *Agent) submitLog(msg string, lvl agentapi.LogLevel) { func (a *Agent) workAck(m *nats.Msg, accepted bool, msg string) error { ack := agentapi.DeployResponse{ Accepted: accepted, - Message: agentapi.StringOrNil(msg), + Message: models.StringOrNil(msg), } bytes, err := json.Marshal(&ack) diff --git a/agent/providers/api.go b/agent/providers/api.go index c8a0ffa5..fcdc1667 100644 --- a/agent/providers/api.go +++ b/agent/providers/api.go @@ -6,22 +6,9 @@ import ( "github.com/synadia-io/nex/agent/providers/lib" agentapi "github.com/synadia-io/nex/internal/agent-api" + "github.com/synadia-io/nex/internal/models" ) -// TODO: change this to "native" from "elf" when appropriate - -// NexExecutionProviderNative Executable Linkable Format execution provider -const NexExecutionProviderNative = "elf" - -// NexExecutionProviderV8 V8 execution provider -const NexExecutionProviderV8 = "v8" - -// NexExecutionProviderOCI OCI execution provider -const NexExecutionProviderOCI = "oci" - -// NexExecutionProviderWasm Wasm execution provider -const NexExecutionProviderWasm = "wasm" - // ExecutionProvider implementations provide support for a specific // execution environment pattern -- e.g., statically-linked ELF // binaries, serverless JavaScript functions, OCI images, Wasm, etc. @@ -42,19 +29,19 @@ type ExecutionProvider interface { // NewExecutionProvider initializes and returns an execution provider for a given work request func NewExecutionProvider(params *agentapi.ExecutionProviderParams) (ExecutionProvider, error) { - if params.WorkloadType == nil { - return nil, errors.New("execution provider factory requires a workload type parameter") - } + // if params.WorkloadType == nil { + // return nil, errors.New("execution provider factory requires a workload type parameter") + // } - switch *params.WorkloadType { - case NexExecutionProviderNative: + switch params.WorkloadType { + case models.NexWorkloadNative: return lib.InitNexExecutionProviderNative(params) - case NexExecutionProviderV8: + case models.NexWorkloadV8: return lib.InitNexExecutionProviderV8(params) - case NexExecutionProviderOCI: + case models.NexWorkloadOCI: // TODO-- return lib.InitNexExecutionProviderOCI(params), nil return nil, errors.New("oci execution provider not yet implemented") - case NexExecutionProviderWasm: + case models.NexWorkloadWasm: return lib.InitNexExecutionProviderWasm(params) default: break diff --git a/agent/providers/lib/native.go b/agent/providers/lib/native.go index 73b2b809..d86c0d2e 100644 --- a/agent/providers/lib/native.go +++ b/agent/providers/lib/native.go @@ -155,7 +155,7 @@ func validateNativeBinary(path string) error { func verifyStatic(elf *elf.File) error { for _, prog := range elf.Progs { if prog.ProgHeader.Type == 3 { // PT_INTERP - return errors.New("elf binary contains at least one dynamically linked dependency") + return errors.New("native binary contains at least one dynamically linked dependency") } } return nil diff --git a/control-api/run.go b/control-api/run.go index 65e5e98f..07b69062 100644 --- a/control-api/run.go +++ b/control-api/run.go @@ -11,14 +11,15 @@ import ( "github.com/nats-io/jwt/v2" "github.com/nats-io/nkeys" + "github.com/synadia-io/nex/internal/models" ) type DeployRequest struct { - Argv []string `json:"argv,omitempty"` - Description *string `json:"description,omitempty"` - WorkloadType *string `json:"type"` - Location *url.URL `json:"location"` - Essential *bool `json:"essential,omitempty"` + Argv []string `json:"argv,omitempty"` + Description *string `json:"description,omitempty"` + WorkloadType models.NexWorkload `json:"type"` + Location *url.URL `json:"location"` + Essential *bool `json:"essential,omitempty"` // Contains claims for the workload: name, hash WorkloadJwt *string `json:"workload_jwt"` @@ -69,7 +70,7 @@ func NewDeployRequest(opts ...RequestOption) (*DeployRequest, error) { req := &DeployRequest{ Argv: reqOpts.argv, Description: &reqOpts.workloadDescription, - WorkloadType: &reqOpts.workloadType, + WorkloadType: reqOpts.workloadType, Location: &reqOpts.location, WorkloadJwt: &workloadJwt, Environment: &encryptedEnv, @@ -145,7 +146,7 @@ func (request *DeployRequest) DecryptRequestEnvironment(recipientXKey nkeys.KeyP type requestOptions struct { argv []string workloadName string - workloadType string + workloadType models.NexWorkload workloadDescription string location url.URL env map[string]string @@ -177,8 +178,8 @@ func WorkloadName(name string) RequestOption { } } -// Type of the workload, e.g., one of "elf", "v8", "oci", "wasm" for this request -func WorkloadType(workloadType string) RequestOption { +// Type of the workload, e.g., one of "native", "v8", "oci", "wasm" for this request +func WorkloadType(workloadType models.NexWorkload) RequestOption { return func(o requestOptions) requestOptions { o.workloadType = workloadType return o diff --git a/control-api/types.go b/control-api/types.go index cdea60af..da4b1b9b 100644 --- a/control-api/types.go +++ b/control-api/types.go @@ -4,6 +4,7 @@ import ( "log/slog" cloudevents "github.com/cloudevents/sdk-go" + "github.com/synadia-io/nex/internal/models" ) const ( @@ -52,10 +53,10 @@ type WorkloadPingResponse struct { } type WorkloadPingMachineSummary struct { - Id string `json:"id"` - Namespace string `json:"namespace"` - Name string `json:"name"` - WorkloadType string `json:"type"` + Id string `json:"id"` + Namespace string `json:"namespace"` + Name string `json:"name"` + WorkloadType models.NexWorkload `json:"type"` } type LameDuckResponse struct { @@ -70,13 +71,13 @@ type MemoryStat struct { } type InfoResponse struct { - Version string `json:"version"` - Uptime string `json:"uptime"` - PublicXKey string `json:"public_xkey"` - Tags map[string]string `json:"tags,omitempty"` - Memory *MemoryStat `json:"memory,omitempty"` - Machines []MachineSummary `json:"machines"` - SupportedWorkloadTypes []string `json:"supported_workload_types,omitempty"` + Version string `json:"version"` + Uptime string `json:"uptime"` + PublicXKey string `json:"public_xkey"` + Tags map[string]string `json:"tags,omitempty"` + Memory *MemoryStat `json:"memory,omitempty"` + Machines []MachineSummary `json:"machines"` + SupportedWorkloadTypes []models.NexWorkload `json:"supported_workload_types,omitempty"` } type MachineSummary struct { @@ -88,11 +89,11 @@ type MachineSummary struct { } type WorkloadSummary struct { - Name string `json:"name"` - Description string `json:"description,omitempty"` - Runtime string `json:"runtime"` - WorkloadType string `json:"type"` - Hash string `json:"hash"` + Name string `json:"name"` + Description string `json:"description,omitempty"` + Runtime string `json:"runtime"` + WorkloadType models.NexWorkload `json:"type"` + Hash string `json:"hash"` } type Envelope struct { diff --git a/internal/agent-api/types.go b/internal/agent-api/types.go index e1ed3819..b03610be 100644 --- a/internal/agent-api/types.go +++ b/internal/agent-api/types.go @@ -5,25 +5,13 @@ import ( "errors" "io" "net/url" - "strings" "time" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" + "github.com/synadia-io/nex/internal/models" ) -// Executable Linkable Format execution provider -const NexExecutionProviderELF = "elf" - -// V8 execution provider -const NexExecutionProviderV8 = "v8" - -// OCI execution provider -const NexExecutionProviderOCI = "oci" - -// Wasm execution provider -const NexExecutionProviderWasm = "wasm" - // Name of the internal, non-public bucket for sharing files between host and agent const WorkloadCacheBucket = "NEXCACHE" @@ -56,19 +44,19 @@ type ExecutionProviderParams struct { // DeployRequest processed by the agent type DeployRequest struct { - Argv []string `json:"argv,omitempty"` - DecodedClaims jwt.GenericClaims `json:"-"` - Description *string `json:"description"` - Environment map[string]string `json:"environment"` - Essential *bool `json:"essential,omitempty"` - Hash string `json:"hash,omitempty"` - Namespace *string `json:"namespace,omitempty"` - RetriedAt *time.Time `json:"retried_at,omitempty"` - RetryCount *uint `json:"retry_count,omitempty"` - TotalBytes int64 `json:"total_bytes,omitempty"` - TriggerSubjects []string `json:"trigger_subjects"` - WorkloadName *string `json:"workload_name,omitempty"` - WorkloadType *string `json:"workload_type,omitempty"` + Argv []string `json:"argv,omitempty"` + DecodedClaims jwt.GenericClaims `json:"-"` + Description *string `json:"description"` + Environment map[string]string `json:"environment"` + Essential *bool `json:"essential,omitempty"` + Hash string `json:"hash,omitempty"` + Namespace *string `json:"namespace,omitempty"` + RetriedAt *time.Time `json:"retried_at,omitempty"` + RetryCount *uint `json:"retry_count,omitempty"` + TotalBytes int64 `json:"total_bytes,omitempty"` + TriggerSubjects []string `json:"trigger_subjects"` + WorkloadName *string `json:"workload_name,omitempty"` + WorkloadType models.NexWorkload `json:"workload_type,omitempty"` Stderr io.Writer `json:"-"` Stdout io.Writer `json:"-"` @@ -90,14 +78,14 @@ func (request *DeployRequest) IsEssential() bool { // Returns true if the run request supports essential flag func (request *DeployRequest) SupportsEssential() bool { - return strings.EqualFold(*request.WorkloadType, "elf") || - strings.EqualFold(*request.WorkloadType, "oci") + return request.WorkloadType == models.NexWorkloadNative || + request.WorkloadType == models.NexWorkloadOCI } // Returns true if the run request supports trigger subjects func (request *DeployRequest) SupportsTriggerSubjects() bool { - return (strings.EqualFold(*request.WorkloadType, "v8") || - strings.EqualFold(*request.WorkloadType, "wasm")) && + return (request.WorkloadType == models.NexWorkloadV8 || + request.WorkloadType == models.NexWorkloadWasm) && len(request.TriggerSubjects) > 0 } @@ -124,10 +112,10 @@ func (r *DeployRequest) Validate() error { err = errors.Join(err, errors.New("total bytes is required")) } - if r.WorkloadType == nil { + if r.WorkloadType == "" { err = errors.Join(err, errors.New("workload type is required")) - } else if (strings.EqualFold(*r.WorkloadType, NexExecutionProviderV8) || - strings.EqualFold(*r.WorkloadType, NexExecutionProviderWasm)) && + } else if (r.WorkloadType == models.NexWorkloadV8 || + r.WorkloadType == models.NexWorkloadWasm) && len(r.TriggerSubjects) == 0 { err = errors.Join(err, errors.New("at least one trigger subject is required for this workload type")) } diff --git a/internal/models/cli.go b/internal/models/cli.go index 0f4f6f10..2c41ce62 100644 --- a/internal/models/cli.go +++ b/internal/models/cli.go @@ -70,7 +70,7 @@ type RunOptions struct { TargetNode string WorkloadUrl *url.URL Name string - WorkloadType string + WorkloadType NexWorkload Description string PublisherXkeyFile string ClaimsIssuerFile string diff --git a/internal/models/config.go b/internal/models/config.go index 6e02ded0..51084f13 100644 --- a/internal/models/config.go +++ b/internal/models/config.go @@ -9,7 +9,6 @@ import ( "github.com/nats-io/nats-server/v2/server" "github.com/splode/fname" - agentapi "github.com/synadia-io/nex/internal/agent-api" ) const ( @@ -25,8 +24,7 @@ const ( ) var ( - // docker/OCI needs to be explicitly enabled in node configuration - DefaultWorkloadTypes = []string{"elf", "v8", "wasm"} + DefaultWorkloadTypes = []NexWorkload{NexWorkloadNative} DefaultBinPath = append([]string{"/usr/local/bin"}, filepath.SplitList(os.Getenv("PATH"))...) @@ -59,7 +57,7 @@ type NodeConfiguration struct { RootFsFilepath string `json:"rootfs_filepath"` Tags map[string]string `json:"tags,omitempty"` ValidIssuers []string `json:"valid_issuers,omitempty"` - WorkloadTypes []string `json:"workload_types,omitempty"` + WorkloadTypes []NexWorkload `json:"workload_types,omitempty"` HostServicesConfiguration *HostServicesConfig `json:"host_services,omitempty"` // Public NATS server options; when non-nil, a public "userland" NATS server is started during node init @@ -132,7 +130,7 @@ func DefaultNodeConfiguration() NodeConfiguration { BinPath: DefaultBinPath, // CAUTION: This needs to be the IP of the node server's internal NATS --as visible to the agent. // This is not necessarily the address on which the internal NATS server is actually listening inside the node. - InternalNodeHost: agentapi.StringOrNil(DefaultInternalNodeHost), + InternalNodeHost: StringOrNil(DefaultInternalNodeHost), InternalNodePort: &defaultNodePort, MachinePoolSize: 1, MachineTemplate: MachineTemplate{ @@ -171,9 +169,9 @@ func DefaultNodeConfiguration() NodeConfiguration { if !config.NoSandbox { config.CNI = CNIDefinition{ BinPath: DefaultCNIBinPath, - NetworkName: agentapi.StringOrNil(DefaultCNINetworkName), - InterfaceName: agentapi.StringOrNil(DefaultCNIInterfaceName), - Subnet: agentapi.StringOrNil(DefaultCNISubnet), + NetworkName: StringOrNil(DefaultCNINetworkName), + InterfaceName: StringOrNil(DefaultCNIInterfaceName), + Subnet: StringOrNil(DefaultCNISubnet), } } diff --git a/internal/models/node_capabilities.go b/internal/models/node_capabilities.go new file mode 100644 index 00000000..1b05e958 --- /dev/null +++ b/internal/models/node_capabilities.go @@ -0,0 +1,16 @@ +package models + +type NexWorkload string + +const ( + NexWorkloadNative NexWorkload = "native" + NexWorkloadV8 NexWorkload = "v8" + NexWorkloadOCI NexWorkload = "oci" + NexWorkloadWasm NexWorkload = "wasm" +) + +type NodeCapabilities struct { + Sandboxable bool `json:"sandboxable"` + SupportedProviders []NexWorkload `json:"supported_providers"` + NodeTags map[string]string `json:"node_tags"` +} diff --git a/internal/models/node_darwin.go b/internal/models/node_darwin.go new file mode 100644 index 00000000..c93d7027 --- /dev/null +++ b/internal/models/node_darwin.go @@ -0,0 +1,13 @@ +package models + +func GetNodeCapabilities(tags map[string]string) *NodeCapabilities { + return &NodeCapabilities{ + Sandboxable: false, + SupportedProviders: []NexWorkload{ + NexWorkloadNative, + NexWorkloadOCI, + NexWorkloadWasm, + }, + NodeTags: tags, + } +} diff --git a/internal/models/node_linux_amd64.go b/internal/models/node_linux_amd64.go new file mode 100644 index 00000000..97426b9d --- /dev/null +++ b/internal/models/node_linux_amd64.go @@ -0,0 +1,14 @@ +package models + +func GetNodeCapabilities(tags map[string]string) *NodeCapabilities { + return &NodeCapabilities{ + Sandboxable: true, + SupportedProviders: []NexWorkload{ + NexWorkloadNative, + NexWorkloadOCI, + NexWorkloadWasm, + NexWorkloadV8, + }, + NodeTags: tags, + } +} diff --git a/internal/models/node_linux_arm64.go b/internal/models/node_linux_arm64.go new file mode 100644 index 00000000..a6a23406 --- /dev/null +++ b/internal/models/node_linux_arm64.go @@ -0,0 +1,13 @@ +package models + +func GetNodeCapabilities(tags map[string]string) *NodeCapabilities { + return &NodeCapabilities{ + Sandboxable: true, + SupportedProviders: []NexWorkload{ + NexWorkloadNative, + NexWorkloadOCI, + NexWorkloadWasm, + }, + NodeTags: tags, + } +} diff --git a/internal/models/node_windows.go b/internal/models/node_windows.go new file mode 100644 index 00000000..7ef588b4 --- /dev/null +++ b/internal/models/node_windows.go @@ -0,0 +1,11 @@ +package models + +func GetNodeCapabilities(tags map[string]string) *NodeCapabilities { + return &NodeCapabilities{ + Sandboxable: false, + SupportedProviders: []NexWorkload{ + NexWorkloadNative, + }, + NodeTags: tags, + } +} diff --git a/internal/agent-api/utils.go b/internal/models/utils.go similarity index 88% rename from internal/agent-api/utils.go rename to internal/models/utils.go index d56ac27d..6c65dfba 100644 --- a/internal/agent-api/utils.go +++ b/internal/models/utils.go @@ -1,4 +1,4 @@ -package agentapi +package models // returns the given string or nil if empty func StringOrNil(str string) *string { diff --git a/internal/node/config.go b/internal/node/config.go index 0f8e80e5..229c91a7 100644 --- a/internal/node/config.go +++ b/internal/node/config.go @@ -25,7 +25,7 @@ func LoadNodeConfiguration(configFilepath string) (*models.NodeConfiguration, er } if len(config.WorkloadTypes) == 0 { - config.WorkloadTypes = models.DefaultWorkloadTypes + config.WorkloadTypes = []models.NexWorkload{models.NexWorkloadNative} } if strings.EqualFold(runtime.GOOS, "windows") && !config.NoSandbox { diff --git a/internal/node/controlapi.go b/internal/node/controlapi.go index dd55e629..c2e9829d 100644 --- a/internal/node/controlapi.go +++ b/internal/node/controlapi.go @@ -15,6 +15,7 @@ import ( "github.com/pkg/errors" controlapi "github.com/synadia-io/nex/control-api" agentapi "github.com/synadia-io/nex/internal/agent-api" + "github.com/synadia-io/nex/internal/models" ) // The API listener is the command and control interface for the node server @@ -224,16 +225,16 @@ func (api *ApiListener) handleDeploy(m *nats.Msg) { return } - if !slices.Contains(api.node.config.WorkloadTypes, *request.WorkloadType) { - api.log.Error("This node does not support the given workload type", slog.String("workload_type", *request.WorkloadType)) - respondFail(controlapi.RunResponseType, m, fmt.Sprintf("Unsupported workload type on this node: %s", *request.WorkloadType)) + if !slices.Contains(api.node.config.WorkloadTypes, request.WorkloadType) { + api.log.Error("This node does not support the given workload type", slog.String("workload_type", string(request.WorkloadType))) + respondFail(controlapi.RunResponseType, m, fmt.Sprintf("Unsupported workload type on this node: %s", string(request.WorkloadType))) return } - if len(request.TriggerSubjects) > 0 && (!strings.EqualFold(*request.WorkloadType, "v8") && - !strings.EqualFold(*request.WorkloadType, "wasm")) { // FIXME -- workload type comparison - api.log.Error("Workload type does not support trigger subject registration", slog.String("trigger_subjects", *request.WorkloadType)) - respondFail(controlapi.RunResponseType, m, fmt.Sprintf("Unsupported workload type for trigger subject registration: %s", *request.WorkloadType)) + if len(request.TriggerSubjects) > 0 && (request.WorkloadType != models.NexWorkloadV8 && + request.WorkloadType != models.NexWorkloadWasm) { // FIXME -- workload type comparison + api.log.Error("Workload type does not support trigger subject registration", slog.String("trigger_subjects", string(request.WorkloadType))) + respondFail(controlapi.RunResponseType, m, fmt.Sprintf("Unsupported workload type for trigger subject registration: %s", string(request.WorkloadType))) return } @@ -294,7 +295,7 @@ func (api *ApiListener) handleDeploy(m *nats.Msg) { slog.String("workload", *deployRequest.WorkloadName), slog.Uint64("workload_size", numBytes), slog.String("workload_sha256", *workloadHash), - slog.String("type", *request.WorkloadType), + slog.String("type", string(request.WorkloadType)), ) workloadID, err := api.mgr.DeployWorkload(deployRequest) diff --git a/internal/node/node.go b/internal/node/node.go index c451a48c..5b57d7dd 100644 --- a/internal/node/node.go +++ b/internal/node/node.go @@ -70,6 +70,8 @@ type Node struct { startedAt time.Time telemetry *observability.Telemetry + + capabilities models.NodeCapabilities } func NewNode( @@ -102,7 +104,9 @@ func NewNode( if err != nil { return nil, fmt.Errorf("failed to extract public key: %s", err.Error()) } + node.nexus = nodeOpts.NexusName + node.capabilities = *models.GetNodeCapabilities(node.config.Tags) return node, nil } diff --git a/internal/node/processmanager/firecracker_procman.go b/internal/node/processmanager/firecracker_procman.go index 3fbc9d3c..3d991096 100644 --- a/internal/node/processmanager/firecracker_procman.go +++ b/internal/node/processmanager/firecracker_procman.go @@ -196,8 +196,8 @@ func (f *FirecrackerProcessManager) StopProcess(workloadID string) error { delete(f.stopMutex, workloadID) if vm.deployRequest != nil { - f.t.WorkloadCounter.Add(f.ctx, -1, metric.WithAttributes(attribute.String("workload_type", *vm.deployRequest.WorkloadType))) - f.t.WorkloadCounter.Add(f.ctx, -1, metric.WithAttributes(attribute.String("workload_type", *vm.deployRequest.WorkloadType)), metric.WithAttributes(attribute.String("namespace", vm.namespace))) + f.t.WorkloadCounter.Add(f.ctx, -1, metric.WithAttributes(attribute.String("workload_type", string(vm.deployRequest.WorkloadType)))) + f.t.WorkloadCounter.Add(f.ctx, -1, metric.WithAttributes(attribute.String("workload_type", string(vm.deployRequest.WorkloadType))), metric.WithAttributes(attribute.String("namespace", vm.namespace))) f.t.DeployedByteCounter.Add(f.ctx, vm.deployRequest.TotalBytes*-1) f.t.DeployedByteCounter.Add(f.ctx, vm.deployRequest.TotalBytes*-1, metric.WithAttributes(attribute.String("namespace", vm.namespace))) } @@ -262,7 +262,7 @@ func (f *FirecrackerProcessManager) cleanSockets() { func (f *FirecrackerProcessManager) setMetadata(vm *runningFirecracker) error { return vm.setMetadata(&agentapi.MachineMetadata{ - Message: agentapi.StringOrNil("Host-supplied metadata"), + Message: models.StringOrNil("Host-supplied metadata"), NodeNatsHost: vm.config.InternalNodeHost, NodeNatsPort: vm.config.InternalNodePort, VmID: &vm.vmmID, diff --git a/internal/node/workload_mgr.go b/internal/node/workload_mgr.go index 5569550a..a62a5f62 100644 --- a/internal/node/workload_mgr.go +++ b/internal/node/workload_mgr.go @@ -239,7 +239,7 @@ func (w *WorkloadManager) DeployWorkload(request *agentapi.DeployRequest) (*stri w.log.Error("Failed to create trigger subject subscription for deployed workload", slog.String("workload_id", workloadID), slog.String("trigger_subject", tsub), - slog.String("workload_type", *request.WorkloadType), + slog.String("workload_type", string(request.WorkloadType)), slog.Any("err", err), ) _ = w.StopWorkload(workloadID, true) @@ -249,7 +249,7 @@ func (w *WorkloadManager) DeployWorkload(request *agentapi.DeployRequest) (*stri w.log.Info("Created trigger subject subscription for deployed workload", slog.String("workload_id", workloadID), slog.String("trigger_subject", tsub), - slog.String("workload_type", *request.WorkloadType), + slog.String("workload_type", string(request.WorkloadType)), ) w.subz[workloadID] = append(w.subz[workloadID], sub) @@ -260,8 +260,8 @@ func (w *WorkloadManager) DeployWorkload(request *agentapi.DeployRequest) (*stri return nil, fmt.Errorf("workload rejected by agent: %s", *deployResponse.Message) } - w.t.WorkloadCounter.Add(w.ctx, 1, metric.WithAttributes(attribute.String("workload_type", *request.WorkloadType))) - w.t.WorkloadCounter.Add(w.ctx, 1, metric.WithAttributes(attribute.String("namespace", *request.Namespace)), metric.WithAttributes(attribute.String("workload_type", *request.WorkloadType))) + w.t.WorkloadCounter.Add(w.ctx, 1, metric.WithAttributes(attribute.String("workload_type", string(request.WorkloadType)))) + w.t.WorkloadCounter.Add(w.ctx, 1, metric.WithAttributes(attribute.String("namespace", *request.Namespace)), metric.WithAttributes(attribute.String("workload_type", string(request.WorkloadType)))) w.t.DeployedByteCounter.Add(w.ctx, request.TotalBytes) w.t.DeployedByteCounter.Add(w.ctx, request.TotalBytes, metric.WithAttributes(attribute.String("namespace", *request.Namespace))) @@ -289,7 +289,7 @@ func (w *WorkloadManager) RunningWorkloads() ([]controlapi.MachineSummary, error agentClient, ok := w.activeAgents[p.ID] if ok { uptimeFriendly = myUptime(agentClient.UptimeMillis()) - if *p.DeployRequest.WorkloadType == "v8" || *p.DeployRequest.WorkloadType == "wasm" { + if p.DeployRequest.WorkloadType == models.NexWorkloadV8 || p.DeployRequest.WorkloadType == models.NexWorkloadWasm { nanoTime := fmt.Sprintf("%dns", agentClient.ExecTimeNanos()) rt, err := time.ParseDuration(nanoTime) if err == nil { @@ -317,7 +317,7 @@ func (w *WorkloadManager) RunningWorkloads() ([]controlapi.MachineSummary, error Name: p.Name, Description: *p.DeployRequest.Description, Runtime: runtimeFriendly, - WorkloadType: *p.DeployRequest.WorkloadType, + WorkloadType: p.DeployRequest.WorkloadType, Hash: p.DeployRequest.Hash, }, } @@ -496,7 +496,7 @@ func (w *WorkloadManager) generateTriggerHandler(workloadID string, tsub string, w.log.Error("Failed to request agent execution via internal trigger subject", slog.Any("err", err), slog.String("trigger_subject", tsub), - slog.String("workload_type", *request.WorkloadType), + slog.String("workload_type", string(request.WorkloadType)), slog.String("workload_id", workloadID), ) @@ -510,7 +510,7 @@ func (w *WorkloadManager) generateTriggerHandler(workloadID string, tsub string, w.log.Debug("Received response from execution via trigger subject", slog.String("workload_id", workloadID), slog.String("trigger_subject", tsub), - slog.String("workload_type", *request.WorkloadType), + slog.String("workload_type", string(request.WorkloadType)), slog.String("function_run_time_nanosec", runtimeNs), slog.Int("payload_size", len(resp.Data)), ) @@ -538,7 +538,7 @@ func (w *WorkloadManager) generateTriggerHandler(workloadID string, tsub string, w.log.Error("Failed to respond to trigger subject subscription request for deployed workload", slog.String("workload_id", workloadID), slog.String("trigger_subject", tsub), - slog.String("workload_type", *request.WorkloadType), + slog.String("workload_type", string(request.WorkloadType)), slog.Any("err", err), ) } diff --git a/internal/node/workload_mgr_events.go b/internal/node/workload_mgr_events.go index 3446188f..16b3492b 100644 --- a/internal/node/workload_mgr_events.go +++ b/internal/node/workload_mgr_events.go @@ -49,7 +49,7 @@ func (w *WorkloadManager) agentEvent(agentId string, evt cloudevents.Event) { slog.String("vmid", agentId), slog.String("namespace", *deployRequest.Namespace), slog.String("workload", *deployRequest.WorkloadName), - slog.String("workload_type", *deployRequest.WorkloadType)) + slog.String("workload_type", string(deployRequest.WorkloadType))) if deployRequest.RetryCount == nil { retryCount := uint(0) diff --git a/nex/devrunner.go b/nex/devrunner.go index 3e10f8d2..507d3764 100644 --- a/nex/devrunner.go +++ b/nex/devrunner.go @@ -16,7 +16,6 @@ import ( "github.com/nats-io/nats.go" "github.com/nats-io/nkeys" controlapi "github.com/synadia-io/nex/control-api" - agentapi "github.com/synadia-io/nex/internal/agent-api" "github.com/synadia-io/nex/internal/models" ) @@ -26,7 +25,7 @@ var ( const ( defaultFileMode = os.FileMode(int(0770)) // owner and group r/w/x - defaultWorkloadType = agentapi.NexExecutionProviderELF + defaultWorkloadType = models.NexWorkloadNative fileExtensionJS = "js" fileExtensionWasm = "wasm" diff --git a/nex/nex.go b/nex/nex.go index d162002a..b54e9fb1 100644 --- a/nex/nex.go +++ b/nex/nex.go @@ -66,6 +66,8 @@ var ( WatchOpts = &models.WatchOptions{} NodeOpts = &models.NodeOptions{} RootfsOpts = &models.RootfsOptions{} + + workloadType string ) func init() { @@ -101,7 +103,7 @@ func init() { run.Flag("issuer", "Path to a seed key to sign the workload JWT as the issuer").Required().ExistingFileVar(&RunOpts.ClaimsIssuerFile) run.Arg("env", "Environment variables to pass to workload").StringMapVar(&RunOpts.Env) run.Flag("name", "Name of the workload. Must be alphabetic (lowercase)").Required().StringVar(&RunOpts.Name) - run.Flag("type", "Type of workload").EnumVar(&RunOpts.WorkloadType, "elf", "v8", "wasm") + run.Flag("type", "Type of workload").Default("native").EnumVar(&workloadType, "native", "v8", "wasm") run.Flag("description", "Description of the workload").StringVar(&RunOpts.Description) run.Flag("argv", "Arguments to pass to the workload, if applicable").StringVar(&RunOpts.Argv) run.Flag("essential", "When true, workload is redeployed if it exits with a non-zero status").BoolVar(&RunOpts.Essential) @@ -114,7 +116,7 @@ func init() { yeet.Flag("trigger_subject", "Trigger subjects to register for subsequent workload execution, if supported by the workload type").StringsVar(&RunOpts.TriggerSubjects) yeet.Flag("stop", "Indicates whether to stop pre-existing workloads during launch. Disable with caution").Default("true").BoolVar(&DevRunOpts.AutoStop) yeet.Flag("bucketmaxbytes", "Overrides the default max bytes if the dev object store bucket is created").UintVar(&DevRunOpts.DevBucketMaxBytes) - yeet.Flag("type", "Type of workload").Default("elf").EnumVar(&RunOpts.WorkloadType, "elf", "v8", "wasm") + yeet.Flag("type", "Type of workload").Default("native").EnumVar(&workloadType, "native", "v8", "wasm") stop.Arg("id", "Public key of the target node on which to stop the workload").Required().StringVar(&StopOpts.TargetNode) stop.Arg("workload_id", "Unique ID of the workload to be stopped").Required().StringVar(&StopOpts.WorkloadId) @@ -144,6 +146,17 @@ func main() { setConditionalCommands() cmd := fisk.MustParse(ncli.Parse(os.Args[1:])) + switch workloadType { + case "native": + RunOpts.WorkloadType = models.NexWorkloadNative + case "v8": + RunOpts.WorkloadType = models.NexWorkloadV8 + case "oci": + RunOpts.WorkloadType = models.NexWorkloadOCI + case "wasm": + RunOpts.WorkloadType = models.NexWorkloadWasm + } + ctx := context.Background() var handlerOpts []shandler.HandlerOption diff --git a/nex/tui/home/data.go b/nex/tui/home/data.go index b896048d..c8daea40 100644 --- a/nex/tui/home/data.go +++ b/nex/tui/home/data.go @@ -163,7 +163,7 @@ func (w workload) String() string { ret.WriteString("\n") ret.WriteString("\tRuntime: " + w.Workload.Runtime) ret.WriteString("\n") - ret.WriteString("\tWorkload Type: " + w.Workload.WorkloadType) + ret.WriteString("\tWorkload Type: " + string(w.Workload.WorkloadType)) ret.WriteString("\n") ret.WriteString("\n") return ret.String() diff --git a/spec/node_linux_test.go b/spec/node_linux_test.go index e2949aac..fc3e74d8 100644 --- a/spec/node_linux_test.go +++ b/spec/node_linux_test.go @@ -135,6 +135,7 @@ var _ = Describe("nex node", func() { Context("when the specified node configuration file exists", func() { BeforeEach(func() { nodeConfig = models.DefaultNodeConfiguration() + nodeConfig.WorkloadTypes = []models.NexWorkload{models.NexWorkloadNative, models.NexWorkloadV8, models.NexWorkloadWasm} nodeOpts.ConfigFilepath = path.Join(os.TempDir(), fmt.Sprintf("%d-spec-nex-conf.json", _fixtures.seededRand.Int())) }) @@ -229,6 +230,7 @@ var _ = Describe("nex node", func() { Context("when the specified node configuration file exists", func() { BeforeEach(func() { nodeConfig = models.DefaultNodeConfiguration() + nodeConfig.WorkloadTypes = []models.NexWorkload{models.NexWorkloadNative, models.NexWorkloadV8, models.NexWorkloadWasm} nodeOpts.ConfigFilepath = path.Join(os.TempDir(), fmt.Sprintf("%d-spec-nex-conf.json", _fixtures.seededRand.Int())) nodeConfig.NoSandbox = !sandbox @@ -466,7 +468,7 @@ var _ = Describe("nex node", func() { // }) }) - Describe("deploying an ELF binary workload", func() { + Describe("deploying an Native binary workload", func() { var deployRequest *controlapi.DeployRequest var err error @@ -484,16 +486,16 @@ var _ = Describe("nex node", func() { time.Sleep(time.Millisecond * 1000) }) - Context("when the ELF binary is not statically-linked", func() { + Context("when the Native binary is not statically-linked", func() { BeforeEach(func() { cmd := exec.Command("go", "build", "../examples/echoservice") _ = cmd.Start() _ = cmd.Wait() }) - It("should [fail to] deploy the ELF workload", func(ctx SpecContext) { + It("should [fail to] deploy the Native workload", func(ctx SpecContext) { if sandbox { - Expect(err.Error()).To(ContainSubstring("elf binary contains at least one dynamically linked dependency")) + Expect(err.Error()).To(ContainSubstring("native binary contains at least one dynamically linked dependency")) } else { Expect(err).To(BeNil()) } @@ -928,7 +930,7 @@ var _ = Describe("nex node", func() { ) }) -func cacheWorkloadArtifact(nc *nats.Conn, filename string) (string, string, string, error) { +func cacheWorkloadArtifact(nc *nats.Conn, filename string) (string, string, models.NexWorkload, error) { js, err := nc.JetStream() if err != nil { panic(err) @@ -956,14 +958,14 @@ func cacheWorkloadArtifact(nc *nats.Conn, filename string) (string, string, stri return "", "", "", err } - var workloadType string + var workloadType models.NexWorkload switch strings.Replace(filepath.Ext(filename), ".", "", 1) { case "js": - workloadType = agentapi.NexExecutionProviderV8 + workloadType = models.NexWorkloadV8 case "wasm": - workloadType = agentapi.NexExecutionProviderWasm + workloadType = models.NexWorkloadWasm default: - workloadType = "elf" + workloadType = models.NexWorkloadNative } return fmt.Sprintf("nats://%s/%s", "NEXCLIFILES", key), key, workloadType, nil diff --git a/spec/node_windows_test.go b/spec/node_windows_test.go index 7ee9364b..80a3de01 100644 --- a/spec/node_windows_test.go +++ b/spec/node_windows_test.go @@ -87,6 +87,7 @@ var _ = Describe("nex node", func() { BeforeEach(func() { nodeOpts.ConfigFilepath = filepath.Join(os.TempDir(), fmt.Sprintf("%d-non-existent-nex-conf.json", _fixtures.seededRand.Int())) nodeConfig.NoSandbox = true + nodeConfig.WorkloadTypes = []models.NexWorkload{models.NexWorkloadNative, models.NexWorkloadV8, models.NexWorkloadWasm} }) It("should not return an error", func(ctx SpecContext) { @@ -100,6 +101,7 @@ var _ = Describe("nex node", func() { nodeConfig = models.DefaultNodeConfiguration() nodeConfig.NoSandbox = true nodeOpts.ConfigFilepath = path.Join(os.TempDir(), fmt.Sprintf("%d-spec-nex-conf.json", _fixtures.seededRand.Int())) + nodeConfig.WorkloadTypes = []models.NexWorkload{models.NexWorkloadNative, models.NexWorkloadV8, models.NexWorkloadWasm} }) JustBeforeEach(func() { @@ -116,6 +118,7 @@ var _ = Describe("nex node", func() { Context("when the specified default_resource_dir does not exist on the host", func() { BeforeEach(func() { nodeConfig.DefaultResourceDir = filepath.Join(os.TempDir(), fmt.Sprintf("%d-non-existent-nex-resource-dir", _fixtures.seededRand.Int())) + nodeConfig.WorkloadTypes = []models.NexWorkload{models.NexWorkloadNative, models.NexWorkloadV8, models.NexWorkloadWasm} }) It("should not return an error", func(ctx SpecContext) { @@ -179,6 +182,7 @@ var _ = Describe("nex node", func() { BeforeEach(func() { nodeConfig = models.DefaultNodeConfiguration() nodeOpts.ConfigFilepath = path.Join(os.TempDir(), fmt.Sprintf("%d-spec-nex-conf.json", _fixtures.seededRand.Int())) + nodeConfig.WorkloadTypes = []models.NexWorkload{models.NexWorkloadNative, models.NexWorkloadV8, models.NexWorkloadWasm} nodeConfig.NoSandbox = !sandbox nodeKey, _ = nkeys.CreateServer() @@ -474,7 +478,7 @@ var _ = Describe("nex node", func() { ) }) -func cacheWorkloadArtifact(nc *nats.Conn, filename string) (string, string, string, error) { +func cacheWorkloadArtifact(nc *nats.Conn, filename string) (string, string, models.NexWorkload, error) { js, err := nc.JetStream() if err != nil { panic(err) @@ -502,16 +506,16 @@ func cacheWorkloadArtifact(nc *nats.Conn, filename string) (string, string, stri return "", "", "", err } - var workloadType string + var workloadType models.NexWorkload switch strings.Replace(filepath.Ext(filename), ".", "", 1) { case "exe": - workloadType = "elf" + workloadType = models.NexWorkloadNative case "js": - workloadType = agentapi.NexExecutionProviderV8 + workloadType = models.NexWorkloadV8 case "wasm": - workloadType = agentapi.NexExecutionProviderWasm + workloadType = models.NexWorkloadWasm default: - workloadType = "elf" + workloadType = models.NexWorkloadNative } return fmt.Sprintf("nats://%s/%s", "NEXCLIFILES", key), key, workloadType, nil diff --git a/test/wasm_test.go b/test/wasm_test.go index 5e00b023..f42e7bab 100644 --- a/test/wasm_test.go +++ b/test/wasm_test.go @@ -6,18 +6,19 @@ import ( "github.com/synadia-io/nex/agent/providers/lib" agentapi "github.com/synadia-io/nex/internal/agent-api" + "github.com/synadia-io/nex/internal/models" ) func TestWasmExecution(t *testing.T) { file := "../examples/wasm/echofunction/echofunction.wasm" - typ := "wasm" + typ := models.NexWorkloadWasm params := &agentapi.ExecutionProviderParams{ DeployRequest: agentapi.DeployRequest{ Environment: map[string]string{}, Hash: "", TotalBytes: 0, WorkloadName: new(string), - WorkloadType: new(string), + WorkloadType: typ, Stderr: nil, Stdout: nil, TmpFilename: &file, @@ -33,7 +34,7 @@ func TestWasmExecution(t *testing.T) { NATSConn: nil, // FIXME } - params.DeployRequest.WorkloadType = &typ + params.DeployRequest.WorkloadType = typ wasm, err := lib.InitNexExecutionProviderWasm(params) if err != nil { t.Fatalf("Failed to instantiate wasm provider: %s", err)