From a53cf0f758be4b87fef00b1499a37d596a2b7909 Mon Sep 17 00:00:00 2001 From: Kevin Hoffman Date: Fri, 2 Aug 2024 07:47:55 -0400 Subject: [PATCH 1/4] stops infinite recursion. attempts to clarify difference between the 2 deploy requests --- agent/agent.go | 16 +++--- agent/providers/plugins_test.go | 2 +- internal/agent-api/client.go | 18 ++++-- internal/agent-api/types.go | 21 +++---- internal/node/node.go | 41 ++++++-------- .../processmanager/firecracker_procman.go | 6 +- internal/node/processmanager/process_mgr.go | 4 +- internal/node/processmanager/running_vm.go | 2 +- internal/node/processmanager/spawn_procman.go | 8 +-- internal/node/workload_mgr.go | 8 +-- internal/node/workload_mgr_events.go | 56 +++++++++---------- test/wasm_test.go | 4 +- 12 files changed, 90 insertions(+), 96 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index b784350f..3d4b8e4d 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -175,7 +175,7 @@ func (a *Agent) Version() string { // the executable workload artifact from the cache bucket, write it to a // temporary file and make it executable; this method returns the full // path to the cached artifact if successful -func (a *Agent) cacheExecutableArtifact(req *agentapi.DeployRequest) (*string, error) { +func (a *Agent) cacheExecutableArtifact(req *agentapi.AgentWorkloadInfo) (*string, error) { fileName := fmt.Sprintf("workload-%s", *a.md.VmID) tempFile := path.Join(os.TempDir(), fileName) @@ -260,7 +260,7 @@ func (a *Agent) dispatchLogs() { // bucket, write it to tmp, initialize the execution provider per the // request, and then validate and deploy a workload func (a *Agent) handleDeploy(m *nats.Msg) { - var request agentapi.DeployRequest + var request agentapi.AgentWorkloadInfo err := json.Unmarshal(m.Data, &request) if err != nil { msg := fmt.Sprintf("Failed to unmarshal deploy request: %s", err) @@ -455,7 +455,7 @@ func (a *Agent) installSignalHandlers() { // newExecutionProviderParams initializes new execution provider params // for the given work request and starts a goroutine listening -func (a *Agent) newExecutionProviderParams(req *agentapi.DeployRequest, tmpFile string) (*agentapi.ExecutionProviderParams, error) { +func (a *Agent) newExecutionProviderParams(req *agentapi.AgentWorkloadInfo, tmpFile string) (*agentapi.ExecutionProviderParams, error) { if a.md.VmID == nil { return nil, errors.New("vm id is required to initialize execution provider params") } @@ -465,11 +465,11 @@ func (a *Agent) newExecutionProviderParams(req *agentapi.DeployRequest, tmpFile } params := &agentapi.ExecutionProviderParams{ - DeployRequest: *req, - Stderr: &logEmitter{stderr: true, name: *req.WorkloadName, logs: a.agentLogs}, - Stdout: &logEmitter{stderr: false, name: *req.WorkloadName, logs: a.agentLogs}, - TmpFilename: &tmpFile, - VmID: *a.md.VmID, + AgentWorkloadInfo: *req, + Stderr: &logEmitter{stderr: true, name: *req.WorkloadName, logs: a.agentLogs}, + Stdout: &logEmitter{stderr: false, name: *req.WorkloadName, logs: a.agentLogs}, + TmpFilename: &tmpFile, + VmID: *a.md.VmID, Fail: make(chan bool), Run: make(chan bool), diff --git a/agent/providers/plugins_test.go b/agent/providers/plugins_test.go index 1cf359c7..1dd9dbfb 100644 --- a/agent/providers/plugins_test.go +++ b/agent/providers/plugins_test.go @@ -13,7 +13,7 @@ func TestNoopPluginLoad(t *testing.T) { plugPath := "../../test/fixtures" wName := "echofunctionjs" params := &agentapi.ExecutionProviderParams{ - DeployRequest: agentapi.DeployRequest{ + AgentWorkloadInfo: agentapi.AgentWorkloadInfo{ WorkloadName: &wName, WorkloadType: "noop", }, diff --git a/internal/agent-api/client.go b/internal/agent-api/client.go index 98e2cf5c..213052b5 100644 --- a/internal/agent-api/client.go +++ b/internal/agent-api/client.go @@ -27,6 +27,7 @@ type ContactLostCallback func(string) const ( defaultAgentPingIntervalMillis = 5000 + maxRetryAttempts = 3 NexTriggerSubject = "x-nex-trigger-subject" NexRuntimeNs = "x-nex-runtime-ns" @@ -56,9 +57,10 @@ type AgentClient struct { execTotalNanos int64 workloadStartedAt time.Time - deployRequest *DeployRequest - workloadBytes uint64 - subz []*nats.Subscription + deployRequest *AgentWorkloadInfo + workloadBytes uint64 + subz []*nats.Subscription + deployRetryCount uint selected bool // FIXME-- rename... } @@ -82,6 +84,7 @@ func NewAgentClient( handshakeSucceeded: onSuccess, log: log, logReceived: onLog, + deployRetryCount: 0, nc: nc, pingTimeout: pingTimeout, subz: make([]*nats.Subscription, 0), @@ -130,7 +133,11 @@ func (a *AgentClient) Start(agentID string) error { return nil } -func (a *AgentClient) DeployWorkload(request *DeployRequest) (*DeployResponse, error) { +func (a *AgentClient) DeployWorkload(request *AgentWorkloadInfo) (*DeployResponse, error) { + if a.deployRetryCount > maxRetryAttempts { + return nil, fmt.Errorf("exceeded maximum number of agent workload deploy attempts: %d", maxRetryAttempts) + } + bytes, err := json.Marshal(request) if err != nil { return nil, err @@ -148,6 +155,7 @@ func (a *AgentClient) DeployWorkload(request *DeployRequest) (*DeployResponse, e return nil, errors.New("timed out waiting for acknowledgement of workload deployment") } else if errors.Is(err, nats.ErrNoResponders) { time.Sleep(time.Millisecond * 100) + a.deployRetryCount += 1 return a.DeployWorkload(request) } else { return nil, fmt.Errorf("failed to submit request for workload deployment: %s", err) @@ -250,7 +258,7 @@ func (a *AgentClient) UptimeMillis() time.Duration { return time.Since(a.workloadStartedAt) } -func (a *AgentClient) DeployRequest() *DeployRequest { +func (a *AgentClient) DeployRequest() *AgentWorkloadInfo { return a.deployRequest } diff --git a/internal/agent-api/types.go b/internal/agent-api/types.go index c2364645..5d08e9f4 100644 --- a/internal/agent-api/types.go +++ b/internal/agent-api/types.go @@ -21,7 +21,7 @@ const DefaultRunloopSleepTimeoutMillis = 25 // ExecutionProviderParams parameters for initializing a specific execution provider type ExecutionProviderParams struct { - DeployRequest + AgentWorkloadInfo TriggerSubjects []string `json:"trigger_subjects"` // Fail channel receives bool upon command failing to start @@ -45,8 +45,8 @@ type ExecutionProviderParams struct { PluginPath *string `json:"-"` } -// DeployRequest processed by the agent -type DeployRequest struct { +// AgentWorkloadInfo processed by the agent +type AgentWorkloadInfo struct { Argv []string `json:"argv,omitempty"` DecodedClaims jwt.GenericClaims `json:"-"` Description *string `json:"description"` @@ -68,34 +68,29 @@ type DeployRequest struct { Stdout io.Writer `json:"-"` TmpFilename *string `json:"-"` - EncryptedEnvironment *string `json:"-"` - JsDomain *string `json:"-"` - Location *url.URL `json:"-"` - SenderPublicKey *string `json:"-"` - TargetNode *string `json:"-"` - WorkloadJwt *string `json:"-"` + Location *url.URL `json:"-"` Errors []error `json:"errors,omitempty"` } -func (request *DeployRequest) IsEssential() bool { +func (request *AgentWorkloadInfo) IsEssential() bool { return request.Essential != nil && *request.Essential } // Returns true if the run request supports essential flag -func (request *DeployRequest) SupportsEssential() bool { +func (request *AgentWorkloadInfo) SupportsEssential() bool { return request.WorkloadType == controlapi.NexWorkloadNative || request.WorkloadType == controlapi.NexWorkloadOCI } // Returns true if the run request supports trigger subjects -func (request *DeployRequest) SupportsTriggerSubjects() bool { +func (request *AgentWorkloadInfo) SupportsTriggerSubjects() bool { return (request.WorkloadType == controlapi.NexWorkloadV8 || request.WorkloadType == controlapi.NexWorkloadWasm) && len(request.TriggerSubjects) > 0 } -func (r *DeployRequest) Validate() error { +func (r *AgentWorkloadInfo) Validate() error { var err error if r.Namespace == nil { diff --git a/internal/node/node.go b/internal/node/node.go index 1d99eacd..110164e5 100644 --- a/internal/node/node.go +++ b/internal/node/node.go @@ -695,28 +695,23 @@ func (n *Node) shuttingDown() bool { // For the curious - I tried a number of ways of making a common/shared deploy request and only modeling the differences // here, but it made all the code that creates deployment requests look hideous. Will look into cleaning this up again. -func agentDeployRequestFromControlDeployRequest(request *controlapi.DeployRequest, namespace string, numBytes uint64, hash string) *agentapi.DeployRequest { - return &agentapi.DeployRequest{ - Argv: request.Argv, - DecodedClaims: request.DecodedClaims, - Description: request.Description, - EncryptedEnvironment: request.Environment, - Environment: request.WorkloadEnvironment, - Essential: request.Essential, - Hash: hash, - HostServicesConfig: request.HostServicesConfig, - ID: request.ID, - JsDomain: request.JsDomain, - Location: request.Location, - Namespace: &namespace, - RetriedAt: request.RetriedAt, - RetryCount: request.RetryCount, - SenderPublicKey: request.SenderPublicKey, - TargetNode: request.TargetNode, - TotalBytes: int64(numBytes), - TriggerSubjects: request.TriggerSubjects, - WorkloadJwt: request.WorkloadJWT, - WorkloadName: request.WorkloadName, - WorkloadType: request.WorkloadType, +func agentDeployRequestFromControlDeployRequest(request *controlapi.DeployRequest, namespace string, numBytes uint64, hash string) *agentapi.AgentWorkloadInfo { + return &agentapi.AgentWorkloadInfo{ + Argv: request.Argv, + DecodedClaims: request.DecodedClaims, + Description: request.Description, + Environment: request.WorkloadEnvironment, + Essential: request.Essential, + Hash: hash, + HostServicesConfig: request.HostServicesConfig, + ID: request.ID, + Location: request.Location, + Namespace: &namespace, + RetriedAt: request.RetriedAt, + RetryCount: request.RetryCount, + TotalBytes: int64(numBytes), + TriggerSubjects: request.TriggerSubjects, + WorkloadName: request.WorkloadName, + WorkloadType: request.WorkloadType, } } diff --git a/internal/node/processmanager/firecracker_procman.go b/internal/node/processmanager/firecracker_procman.go index d23a2359..e0ae7477 100644 --- a/internal/node/processmanager/firecracker_procman.go +++ b/internal/node/processmanager/firecracker_procman.go @@ -39,7 +39,7 @@ type FirecrackerProcessManager struct { natsint *internalnats.InternalNatsServer delegate ProcessDelegate - deployRequests map[string]*agentapi.DeployRequest + deployRequests map[string]*agentapi.AgentWorkloadInfo } func NewFirecrackerProcessManager( @@ -61,7 +61,7 @@ func NewFirecrackerProcessManager( allVMs: make(map[string]*runningFirecracker), poolVMs: make(chan *runningFirecracker, config.MachinePoolSize), stopMutex: make(map[string]*sync.Mutex), - deployRequests: make(map[string]*agentapi.DeployRequest), + deployRequests: make(map[string]*agentapi.AgentWorkloadInfo), }, nil } @@ -95,7 +95,7 @@ func (f *FirecrackerProcessManager) EnterLameDuck() error { } // Preparing a workload reads from the warmVMs channel -func (f *FirecrackerProcessManager) PrepareWorkload(workloadId string, deployRequest *agentapi.DeployRequest) error { +func (f *FirecrackerProcessManager) PrepareWorkload(workloadId string, deployRequest *agentapi.AgentWorkloadInfo) error { f.mutex.Lock() defer f.mutex.Unlock() diff --git a/internal/node/processmanager/process_mgr.go b/internal/node/processmanager/process_mgr.go index aeb1d256..de6dca91 100644 --- a/internal/node/processmanager/process_mgr.go +++ b/internal/node/processmanager/process_mgr.go @@ -10,7 +10,7 @@ const runloopSleepInterval = 100 * time.Millisecond // Information about an agent process without regard to the implementation of the agent process manager type ProcessInfo struct { - DeployRequest *agentapi.DeployRequest + DeployRequest *agentapi.AgentWorkloadInfo ID string Name string Namespace string @@ -37,7 +37,7 @@ type ProcessManager interface { // Associate a deploy request with the given workload id, and perform any // just in time initialization of resources if necessary - PrepareWorkload(id string, request *agentapi.DeployRequest) error + PrepareWorkload(id string, request *agentapi.AgentWorkloadInfo) error // Start the process manager and allocate a pool of agents based on an implementation-specific // strategy, delegating callbacks to the given delegate diff --git a/internal/node/processmanager/running_vm.go b/internal/node/processmanager/running_vm.go index 9f056b05..58b5dfd8 100644 --- a/internal/node/processmanager/running_vm.go +++ b/internal/node/processmanager/running_vm.go @@ -32,7 +32,7 @@ type runningFirecracker struct { closing uint32 config *nexmodels.NodeConfiguration - deployRequest *agentapi.DeployRequest + deployRequest *agentapi.AgentWorkloadInfo ip net.IP log *slog.Logger machine *firecracker.Machine diff --git a/internal/node/processmanager/spawn_procman.go b/internal/node/processmanager/spawn_procman.go index 9ae367f6..65088ad6 100644 --- a/internal/node/processmanager/spawn_procman.go +++ b/internal/node/processmanager/spawn_procman.go @@ -38,14 +38,14 @@ type SpawningProcessManager struct { natsint *internalnats.InternalNatsServer delegate ProcessDelegate - deployRequests map[string]*agentapi.DeployRequest + deployRequests map[string]*agentapi.AgentWorkloadInfo log *slog.Logger } type spawnedProcess struct { cmd *exec.Cmd - deployRequest *agentapi.DeployRequest + deployRequest *agentapi.AgentWorkloadInfo workloadStarted time.Time ID string @@ -74,7 +74,7 @@ func NewSpawningProcessManager( stopMutexes: make(map[string]*sync.Mutex), - deployRequests: make(map[string]*agentapi.DeployRequest), + deployRequests: make(map[string]*agentapi.AgentWorkloadInfo), allProcs: make(map[string]*spawnedProcess), poolProcs: make(chan *spawnedProcess, config.MachinePoolSize), }, nil @@ -110,7 +110,7 @@ func (s *SpawningProcessManager) EnterLameDuck() error { } // Attaches a deployment request to a running process. Until a process is prepared, it's just an empty agent -func (s *SpawningProcessManager) PrepareWorkload(workloadID string, deployRequest *agentapi.DeployRequest) error { +func (s *SpawningProcessManager) PrepareWorkload(workloadID string, deployRequest *agentapi.AgentWorkloadInfo) error { s.mutex.Lock() defer s.mutex.Unlock() diff --git a/internal/node/workload_mgr.go b/internal/node/workload_mgr.go index 52be256a..68acb6bb 100644 --- a/internal/node/workload_mgr.go +++ b/internal/node/workload_mgr.go @@ -216,7 +216,7 @@ func (m *WorkloadManager) CacheWorkload(workloadID string, request *controlapi.D // Deploy a workload as specified by the given deploy request to an available // agent in the configured pool -func (w *WorkloadManager) DeployWorkload(agentClient *agentapi.AgentClient, request *agentapi.DeployRequest) error { +func (w *WorkloadManager) DeployWorkload(agentClient *agentapi.AgentClient, request *agentapi.AgentWorkloadInfo) error { w.poolMutex.Lock() defer w.poolMutex.Unlock() @@ -318,7 +318,7 @@ func (w *WorkloadManager) DeployWorkload(agentClient *agentapi.AgentClient, requ // Locates a given workload by its workload ID and returns the deployment request associated with it // Note that this means "pending" agents are not considered by lookups -func (w *WorkloadManager) LookupWorkload(workloadID string) (*agentapi.DeployRequest, error) { +func (w *WorkloadManager) LookupWorkload(workloadID string) (*agentapi.AgentWorkloadInfo, error) { if agentClient, ok := w.liveAgents[workloadID]; ok { return agentClient.DeployRequest(), nil } @@ -532,7 +532,7 @@ func (w *WorkloadManager) agentContactLost(workloadID string) { } // Generate a NATS subscriber function that is used to trigger function-type workloads -func (w *WorkloadManager) generateTriggerHandler(workloadID string, tsub string, request *agentapi.DeployRequest) func(msg *nats.Msg) { +func (w *WorkloadManager) generateTriggerHandler(workloadID string, tsub string, request *agentapi.AgentWorkloadInfo) func(msg *nats.Msg) { agentClient, ok := w.liveAgents[workloadID] if !ok { w.log.Error("Attempted to generate trigger handler for non-existent agent client") @@ -632,7 +632,7 @@ func (w *WorkloadManager) startInternalNATS() error { return nil } -func (w *WorkloadManager) createHostServicesConnection(request *agentapi.DeployRequest) (*nats.Conn, error) { +func (w *WorkloadManager) createHostServicesConnection(request *agentapi.AgentWorkloadInfo) (*nats.Conn, error) { natsOpts := []nats.Option{ nats.Name("nex-hostservices"), } diff --git a/internal/node/workload_mgr_events.go b/internal/node/workload_mgr_events.go index fa21ef8b..c2849bd9 100644 --- a/internal/node/workload_mgr_events.go +++ b/internal/node/workload_mgr_events.go @@ -15,16 +15,17 @@ import ( ) func (w *WorkloadManager) agentEvent(agentId string, evt cloudevents.Event) { - deployRequest, _ := w.LookupWorkload(agentId) - if deployRequest == nil { + agentWorkloadInfo, _ := w.LookupWorkload(agentId) + if agentWorkloadInfo == nil { // got an event from a process that doesn't yet have a workload (deployment request) associated // with it return } - evt.SetSource(fmt.Sprintf("%s-%s", *deployRequest.TargetNode, agentId)) - evt.SetExtension(controlapi.EventExtensionNamespace, *deployRequest.Namespace) - err := PublishCloudEvent(w.nc, *deployRequest.Namespace, evt, w.log) + evt.SetSource(fmt.Sprintf("%s-%s", w.publicKey, agentId)) + evt.SetExtension(controlapi.EventExtensionNamespace, *agentWorkloadInfo.Namespace) + + err := PublishCloudEvent(w.nc, *agentWorkloadInfo.Namespace, evt, w.log) if err != nil { w.log.Error("Failed to publish cloudevent", slog.Any("err", err)) return @@ -46,22 +47,22 @@ func (w *WorkloadManager) agentEvent(agentId string, evt cloudevents.Event) { return } - if deployRequest.IsEssential() && workloadStatus.Code != 0 { + if agentWorkloadInfo.IsEssential() && workloadStatus.Code != 0 { w.log.Debug("Essential workload stopped with non-zero exit code", - slog.String("namespace", *deployRequest.Namespace), - slog.String("workload", *deployRequest.WorkloadName), + slog.String("namespace", *agentWorkloadInfo.Namespace), + slog.String("workload", *agentWorkloadInfo.WorkloadName), slog.String("workload_id", agentId), - slog.String("workload_type", string(deployRequest.WorkloadType))) + slog.String("workload_type", string(agentWorkloadInfo.WorkloadType))) - if deployRequest.RetryCount == nil { + if agentWorkloadInfo.RetryCount == nil { retryCount := uint(0) - deployRequest.RetryCount = &retryCount + agentWorkloadInfo.RetryCount = &retryCount } - *deployRequest.RetryCount += 1 + *agentWorkloadInfo.RetryCount += 1 retriedAt := time.Now().UTC() - deployRequest.RetriedAt = &retriedAt + agentWorkloadInfo.RetriedAt = &retriedAt // generate a new uuid for this deploy request reqUUID, err := uuid.NewRandom() @@ -77,13 +78,13 @@ func (w *WorkloadManager) agentEvent(agentId string, evt cloudevents.Event) { return } - bucket, err := js.ObjectStore(deployRequest.Location.Hostname()) + bucket, err := js.ObjectStore(agentWorkloadInfo.Location.Hostname()) if err != nil { w.log.Error("Failed to resolve workload object store", slog.Any("err", err)) return } - artifact := deployRequest.Location.Path[1:len(deployRequest.Location.Path)] + artifact := agentWorkloadInfo.Location.Path[1:len(agentWorkloadInfo.Location.Path)] info, err := bucket.GetInfo(artifact) if err != nil { @@ -93,26 +94,21 @@ func (w *WorkloadManager) agentEvent(agentId string, evt cloudevents.Event) { digest := controlapi.SanitizeNATSDigest(info.Digest) req, _ := json.Marshal(&controlapi.DeployRequest{ - Argv: deployRequest.Argv, - Description: deployRequest.Description, + Argv: agentWorkloadInfo.Argv, + Description: agentWorkloadInfo.Description, Hash: &digest, - Environment: deployRequest.EncryptedEnvironment, - Essential: deployRequest.Essential, + Essential: agentWorkloadInfo.Essential, ID: &id, - JsDomain: deployRequest.JsDomain, - Location: deployRequest.Location, - RetriedAt: deployRequest.RetriedAt, - RetryCount: deployRequest.RetryCount, - SenderPublicKey: deployRequest.SenderPublicKey, - TargetNode: deployRequest.TargetNode, - TriggerSubjects: deployRequest.TriggerSubjects, - WorkloadName: deployRequest.WorkloadName, - WorkloadJWT: deployRequest.WorkloadJwt, - WorkloadType: deployRequest.WorkloadType, + Location: agentWorkloadInfo.Location, + RetriedAt: agentWorkloadInfo.RetriedAt, + RetryCount: agentWorkloadInfo.RetryCount, + TriggerSubjects: agentWorkloadInfo.TriggerSubjects, + WorkloadName: agentWorkloadInfo.WorkloadName, + WorkloadType: agentWorkloadInfo.WorkloadType, }) nodeID := w.publicKey - subject := fmt.Sprintf("%s.DEPLOY.%s.%s", controlapi.APIPrefix, *deployRequest.Namespace, nodeID) + subject := fmt.Sprintf("%s.DEPLOY.%s.%s", controlapi.APIPrefix, *agentWorkloadInfo.Namespace, nodeID) _, err = w.nc.Request(subject, req, time.Millisecond*2500) if err != nil { w.log.Error("Failed to redeploy essential workload", slog.Any("err", err)) diff --git a/test/wasm_test.go b/test/wasm_test.go index 2f3e52b8..f540da13 100644 --- a/test/wasm_test.go +++ b/test/wasm_test.go @@ -13,7 +13,7 @@ func TestWasmExecution(t *testing.T) { file := "../examples/wasm/echofunction/echofunction.wasm" typ := controlapi.NexWorkloadWasm params := &agentapi.ExecutionProviderParams{ - DeployRequest: agentapi.DeployRequest{ + AgentWorkloadInfo: agentapi.AgentWorkloadInfo{ Environment: map[string]string{}, Hash: "", TotalBytes: 0, @@ -34,7 +34,7 @@ func TestWasmExecution(t *testing.T) { NATSConn: nil, // FIXME } - params.DeployRequest.WorkloadType = typ + params.AgentWorkloadInfo.WorkloadType = typ wasm, err := lib.InitNexExecutionProviderWasm(params) if err != nil { t.Fatalf("Failed to instantiate wasm provider: %s", err) From 20a6a5cd793429abb8e4eebd1ed8a770742e305a Mon Sep 17 00:00:00 2001 From: Kevin Hoffman Date: Fri, 2 Aug 2024 07:52:31 -0400 Subject: [PATCH 2/4] a few more renames --- internal/agent-api/client.go | 10 +++++----- internal/agent-api/types.go | 18 +++++++++--------- internal/node/workload_mgr.go | 6 +++--- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/internal/agent-api/client.go b/internal/agent-api/client.go index 213052b5..9d067edb 100644 --- a/internal/agent-api/client.go +++ b/internal/agent-api/client.go @@ -57,7 +57,7 @@ type AgentClient struct { execTotalNanos int64 workloadStartedAt time.Time - deployRequest *AgentWorkloadInfo + workloadInfo *AgentWorkloadInfo workloadBytes uint64 subz []*nats.Subscription deployRetryCount uint @@ -169,7 +169,7 @@ func (a *AgentClient) DeployWorkload(request *AgentWorkloadInfo) (*DeployRespons return nil, err } - a.deployRequest = request + a.workloadInfo = request a.workloadStartedAt = time.Now().UTC() a.workloadBytes = uint64(request.TotalBytes) return &deployResponse, nil @@ -258,8 +258,8 @@ func (a *AgentClient) UptimeMillis() time.Duration { return time.Since(a.workloadStartedAt) } -func (a *AgentClient) DeployRequest() *AgentWorkloadInfo { - return a.deployRequest +func (a *AgentClient) WorkloadInfo() *AgentWorkloadInfo { + return a.workloadInfo } func (a *AgentClient) IsSelected() bool { @@ -294,7 +294,7 @@ func (a *AgentClient) RunTrigger(ctx context.Context, tracer trace.Tracer, subje } func (a *AgentClient) WorkloadType() controlapi.NexWorkload { - return a.deployRequest.WorkloadType + return a.workloadInfo.WorkloadType } func (a *AgentClient) awaitHandshake(agentID string) { diff --git a/internal/agent-api/types.go b/internal/agent-api/types.go index 5d08e9f4..eee99394 100644 --- a/internal/agent-api/types.go +++ b/internal/agent-api/types.go @@ -73,21 +73,21 @@ type AgentWorkloadInfo struct { Errors []error `json:"errors,omitempty"` } -func (request *AgentWorkloadInfo) IsEssential() bool { - return request.Essential != nil && *request.Essential +func (info *AgentWorkloadInfo) IsEssential() bool { + return info.Essential != nil && *info.Essential } // Returns true if the run request supports essential flag -func (request *AgentWorkloadInfo) SupportsEssential() bool { - return request.WorkloadType == controlapi.NexWorkloadNative || - request.WorkloadType == controlapi.NexWorkloadOCI +func (info *AgentWorkloadInfo) SupportsEssential() bool { + return info.WorkloadType == controlapi.NexWorkloadNative || + info.WorkloadType == controlapi.NexWorkloadOCI } // Returns true if the run request supports trigger subjects -func (request *AgentWorkloadInfo) SupportsTriggerSubjects() bool { - return (request.WorkloadType == controlapi.NexWorkloadV8 || - request.WorkloadType == controlapi.NexWorkloadWasm) && - len(request.TriggerSubjects) > 0 +func (info *AgentWorkloadInfo) SupportsTriggerSubjects() bool { + return (info.WorkloadType == controlapi.NexWorkloadV8 || + info.WorkloadType == controlapi.NexWorkloadWasm) && + len(info.TriggerSubjects) > 0 } func (r *AgentWorkloadInfo) Validate() error { diff --git a/internal/node/workload_mgr.go b/internal/node/workload_mgr.go index 68acb6bb..0fdcf2c8 100644 --- a/internal/node/workload_mgr.go +++ b/internal/node/workload_mgr.go @@ -222,7 +222,7 @@ func (w *WorkloadManager) DeployWorkload(agentClient *agentapi.AgentClient, requ if w.config.AllowDuplicateWorkloads != nil && !*w.config.AllowDuplicateWorkloads { for _, agentClient := range w.liveAgents { - if strings.EqualFold(agentClient.DeployRequest().Hash, request.Hash) { + if strings.EqualFold(agentClient.WorkloadInfo().Hash, request.Hash) { w.log.Warn("Attempted to deploy duplicate workload", slog.String("workload_name", *request.WorkloadName), slog.String("workload_type", string(request.WorkloadType)), @@ -320,7 +320,7 @@ func (w *WorkloadManager) DeployWorkload(agentClient *agentapi.AgentClient, requ // Note that this means "pending" agents are not considered by lookups func (w *WorkloadManager) LookupWorkload(workloadID string) (*agentapi.AgentWorkloadInfo, error) { if agentClient, ok := w.liveAgents[workloadID]; ok { - return agentClient.DeployRequest(), nil + return agentClient.WorkloadInfo(), nil } return nil, fmt.Errorf("workload doesn't exist: %s", workloadID) @@ -331,7 +331,7 @@ func (w *WorkloadManager) RunningWorkloads() ([]controlapi.MachineSummary, error summaries := make([]controlapi.MachineSummary, 0) for id, agentClient := range w.liveAgents { - deployRequest := agentClient.DeployRequest() + deployRequest := agentClient.WorkloadInfo() uptimeFriendly := myUptime(agentClient.UptimeMillis()) runtimeFriendly := "--" From f436ef5099048db24cd3b8715f8a392175a57c2d Mon Sep 17 00:00:00 2001 From: Kevin Hoffman Date: Fri, 2 Aug 2024 07:55:27 -0400 Subject: [PATCH 3/4] another rename --- internal/node/controlapi.go | 2 +- internal/node/node.go | 6 ++---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/internal/node/controlapi.go b/internal/node/controlapi.go index 6e7b2d0f..59c12f3f 100644 --- a/internal/node/controlapi.go +++ b/internal/node/controlapi.go @@ -386,7 +386,7 @@ func (api *ApiListener) handleDeploy(ctx context.Context, span trace.Span, m *na return } - agentDeployRequest := agentDeployRequestFromControlDeployRequest(&request, namespace, numBytes, *workloadHash) + agentDeployRequest := agentWorkloadInfoFromControlDeployRequest(&request, namespace, numBytes, *workloadHash) api.log. Info("Submitting workload to agent", diff --git a/internal/node/node.go b/internal/node/node.go index 110164e5..861518ca 100644 --- a/internal/node/node.go +++ b/internal/node/node.go @@ -445,7 +445,7 @@ func (n *Node) handleAutostarts() { continue } - agentDeployRequest := agentDeployRequestFromControlDeployRequest(request, autostart.Namespace, numBytes, *workloadHash) + agentDeployRequest := agentWorkloadInfoFromControlDeployRequest(request, autostart.Namespace, numBytes, *workloadHash) agentDeployRequest.TotalBytes = int64(numBytes) agentDeployRequest.Hash = *workloadHash @@ -693,9 +693,7 @@ func (n *Node) shuttingDown() bool { return (atomic.LoadUint32(&n.closing) > 0) } -// For the curious - I tried a number of ways of making a common/shared deploy request and only modeling the differences -// here, but it made all the code that creates deployment requests look hideous. Will look into cleaning this up again. -func agentDeployRequestFromControlDeployRequest(request *controlapi.DeployRequest, namespace string, numBytes uint64, hash string) *agentapi.AgentWorkloadInfo { +func agentWorkloadInfoFromControlDeployRequest(request *controlapi.DeployRequest, namespace string, numBytes uint64, hash string) *agentapi.AgentWorkloadInfo { return &agentapi.AgentWorkloadInfo{ Argv: request.Argv, DecodedClaims: request.DecodedClaims, From 5fbbf6d32520bf3151dd6fc8a0d2deb3a426e1dc Mon Sep 17 00:00:00 2001 From: Kevin Hoffman Date: Fri, 2 Aug 2024 11:13:48 -0400 Subject: [PATCH 4/4] renaming more variables --- agent/agent.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 3d4b8e4d..6a52e2d5 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -260,8 +260,8 @@ func (a *Agent) dispatchLogs() { // bucket, write it to tmp, initialize the execution provider per the // request, and then validate and deploy a workload func (a *Agent) handleDeploy(m *nats.Msg) { - var request agentapi.AgentWorkloadInfo - err := json.Unmarshal(m.Data, &request) + var info agentapi.AgentWorkloadInfo + err := json.Unmarshal(m.Data, &info) if err != nil { msg := fmt.Sprintf("Failed to unmarshal deploy request: %s", err) a.submitLog(msg, slog.LevelError) @@ -269,19 +269,19 @@ func (a *Agent) handleDeploy(m *nats.Msg) { return } - err = request.Validate() + err = info.Validate() if err != nil { _ = a.workAck(m, false, fmt.Sprintf("%v", err)) // FIXME-- this message can be formatted prettier return } - tmpFile, err := a.cacheExecutableArtifact(&request) + tmpFile, err := a.cacheExecutableArtifact(&info) if err != nil { _ = a.workAck(m, false, err.Error()) return } - params, err := a.newExecutionProviderParams(&request, *tmpFile) + params, err := a.newExecutionProviderParams(&info, *tmpFile) if err != nil { _ = a.workAck(m, false, err.Error()) return @@ -297,7 +297,7 @@ func (a *Agent) handleDeploy(m *nats.Msg) { a.provider = provider shouldValidate := true - if !a.sandboxed && request.WorkloadType == controlapi.NexWorkloadNative { + if !a.sandboxed && info.WorkloadType == controlapi.NexWorkloadNative { shouldValidate = false } @@ -455,19 +455,19 @@ func (a *Agent) installSignalHandlers() { // newExecutionProviderParams initializes new execution provider params // for the given work request and starts a goroutine listening -func (a *Agent) newExecutionProviderParams(req *agentapi.AgentWorkloadInfo, tmpFile string) (*agentapi.ExecutionProviderParams, error) { +func (a *Agent) newExecutionProviderParams(info *agentapi.AgentWorkloadInfo, tmpFile string) (*agentapi.ExecutionProviderParams, error) { if a.md.VmID == nil { return nil, errors.New("vm id is required to initialize execution provider params") } - if req.WorkloadName == nil { + if info.WorkloadName == nil { return nil, errors.New("workload name is required to initialize execution provider params") } params := &agentapi.ExecutionProviderParams{ - AgentWorkloadInfo: *req, - Stderr: &logEmitter{stderr: true, name: *req.WorkloadName, logs: a.agentLogs}, - Stdout: &logEmitter{stderr: false, name: *req.WorkloadName, logs: a.agentLogs}, + AgentWorkloadInfo: *info, + Stderr: &logEmitter{stderr: true, name: *info.WorkloadName, logs: a.agentLogs}, + Stdout: &logEmitter{stderr: false, name: *info.WorkloadName, logs: a.agentLogs}, TmpFilename: &tmpFile, VmID: *a.md.VmID, @@ -476,7 +476,7 @@ func (a *Agent) newExecutionProviderParams(req *agentapi.AgentWorkloadInfo, tmpF Exit: make(chan int), NATSConn: a.nc, - TriggerSubjects: req.TriggerSubjects, + TriggerSubjects: info.TriggerSubjects, PluginPath: a.md.PluginPath, }