Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stops infinite recursion. attempts to clarify difference between the 2 deploy requests #337

Merged
merged 6 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (a *Agent) Version() string {
// the executable workload artifact from the cache bucket, write it to a
// temporary file and make it executable; this method returns the full
// path to the cached artifact if successful
func (a *Agent) cacheExecutableArtifact(req *agentapi.DeployRequest) (*string, error) {
func (a *Agent) cacheExecutableArtifact(req *agentapi.AgentWorkloadInfo) (*string, error) {
fileName := fmt.Sprintf("workload-%s", *a.md.VmID)
tempFile := path.Join(os.TempDir(), fileName)

Expand Down Expand Up @@ -260,7 +260,7 @@ func (a *Agent) dispatchLogs() {
// bucket, write it to tmp, initialize the execution provider per the
// request, and then validate and deploy a workload
func (a *Agent) handleDeploy(m *nats.Msg) {
var request agentapi.DeployRequest
var request agentapi.AgentWorkloadInfo
err := json.Unmarshal(m.Data, &request)
if err != nil {
msg := fmt.Sprintf("Failed to unmarshal deploy request: %s", err)
Expand Down Expand Up @@ -455,7 +455,7 @@ func (a *Agent) installSignalHandlers() {

// newExecutionProviderParams initializes new execution provider params
// for the given work request and starts a goroutine listening
func (a *Agent) newExecutionProviderParams(req *agentapi.DeployRequest, tmpFile string) (*agentapi.ExecutionProviderParams, error) {
func (a *Agent) newExecutionProviderParams(req *agentapi.AgentWorkloadInfo, tmpFile string) (*agentapi.ExecutionProviderParams, error) {
if a.md.VmID == nil {
return nil, errors.New("vm id is required to initialize execution provider params")
}
Expand All @@ -465,11 +465,11 @@ func (a *Agent) newExecutionProviderParams(req *agentapi.DeployRequest, tmpFile
}

params := &agentapi.ExecutionProviderParams{
DeployRequest: *req,
Stderr: &logEmitter{stderr: true, name: *req.WorkloadName, logs: a.agentLogs},
Stdout: &logEmitter{stderr: false, name: *req.WorkloadName, logs: a.agentLogs},
TmpFilename: &tmpFile,
VmID: *a.md.VmID,
AgentWorkloadInfo: *req,
Stderr: &logEmitter{stderr: true, name: *req.WorkloadName, logs: a.agentLogs},
Stdout: &logEmitter{stderr: false, name: *req.WorkloadName, logs: a.agentLogs},
TmpFilename: &tmpFile,
VmID: *a.md.VmID,

Fail: make(chan bool),
Run: make(chan bool),
Expand Down
2 changes: 1 addition & 1 deletion agent/providers/plugins_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func TestNoopPluginLoad(t *testing.T) {
plugPath := "../../test/fixtures"
wName := "echofunctionjs"
params := &agentapi.ExecutionProviderParams{
DeployRequest: agentapi.DeployRequest{
AgentWorkloadInfo: agentapi.AgentWorkloadInfo{
WorkloadName: &wName,
WorkloadType: "noop",
},
Expand Down
24 changes: 16 additions & 8 deletions internal/agent-api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type ContactLostCallback func(string)

const (
defaultAgentPingIntervalMillis = 5000
maxRetryAttempts = 3

NexTriggerSubject = "x-nex-trigger-subject"
NexRuntimeNs = "x-nex-runtime-ns"
Expand Down Expand Up @@ -56,9 +57,10 @@ type AgentClient struct {
execTotalNanos int64
workloadStartedAt time.Time

deployRequest *DeployRequest
workloadBytes uint64
subz []*nats.Subscription
workloadInfo *AgentWorkloadInfo
workloadBytes uint64
subz []*nats.Subscription
deployRetryCount uint

selected bool // FIXME-- rename...
}
Expand All @@ -82,6 +84,7 @@ func NewAgentClient(
handshakeSucceeded: onSuccess,
log: log,
logReceived: onLog,
deployRetryCount: 0,
nc: nc,
pingTimeout: pingTimeout,
subz: make([]*nats.Subscription, 0),
Expand Down Expand Up @@ -130,7 +133,11 @@ func (a *AgentClient) Start(agentID string) error {
return nil
}

func (a *AgentClient) DeployWorkload(request *DeployRequest) (*DeployResponse, error) {
func (a *AgentClient) DeployWorkload(request *AgentWorkloadInfo) (*DeployResponse, error) {
if a.deployRetryCount > maxRetryAttempts {
return nil, fmt.Errorf("exceeded maximum number of agent workload deploy attempts: %d", maxRetryAttempts)
}

bytes, err := json.Marshal(request)
if err != nil {
return nil, err
Expand All @@ -148,6 +155,7 @@ func (a *AgentClient) DeployWorkload(request *DeployRequest) (*DeployResponse, e
return nil, errors.New("timed out waiting for acknowledgement of workload deployment")
} else if errors.Is(err, nats.ErrNoResponders) {
time.Sleep(time.Millisecond * 100)
a.deployRetryCount += 1
return a.DeployWorkload(request)
} else {
return nil, fmt.Errorf("failed to submit request for workload deployment: %s", err)
Expand All @@ -161,7 +169,7 @@ func (a *AgentClient) DeployWorkload(request *DeployRequest) (*DeployResponse, e
return nil, err
}

a.deployRequest = request
a.workloadInfo = request
a.workloadStartedAt = time.Now().UTC()
a.workloadBytes = uint64(request.TotalBytes)
return &deployResponse, nil
Expand Down Expand Up @@ -250,8 +258,8 @@ func (a *AgentClient) UptimeMillis() time.Duration {
return time.Since(a.workloadStartedAt)
}

func (a *AgentClient) DeployRequest() *DeployRequest {
return a.deployRequest
func (a *AgentClient) WorkloadInfo() *AgentWorkloadInfo {
return a.workloadInfo
}

func (a *AgentClient) IsSelected() bool {
Expand Down Expand Up @@ -286,7 +294,7 @@ func (a *AgentClient) RunTrigger(ctx context.Context, tracer trace.Tracer, subje
}

func (a *AgentClient) WorkloadType() controlapi.NexWorkload {
return a.deployRequest.WorkloadType
return a.workloadInfo.WorkloadType
}

func (a *AgentClient) awaitHandshake(agentID string) {
Expand Down
33 changes: 14 additions & 19 deletions internal/agent-api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const DefaultRunloopSleepTimeoutMillis = 25

// ExecutionProviderParams parameters for initializing a specific execution provider
type ExecutionProviderParams struct {
DeployRequest
AgentWorkloadInfo
TriggerSubjects []string `json:"trigger_subjects"`

// Fail channel receives bool upon command failing to start
Expand All @@ -45,8 +45,8 @@ type ExecutionProviderParams struct {
PluginPath *string `json:"-"`
}

// DeployRequest processed by the agent
type DeployRequest struct {
// AgentWorkloadInfo processed by the agent
type AgentWorkloadInfo struct {
Argv []string `json:"argv,omitempty"`
DecodedClaims jwt.GenericClaims `json:"-"`
Description *string `json:"description"`
Expand All @@ -68,34 +68,29 @@ type DeployRequest struct {
Stdout io.Writer `json:"-"`
TmpFilename *string `json:"-"`

EncryptedEnvironment *string `json:"-"`
JsDomain *string `json:"-"`
Location *url.URL `json:"-"`
SenderPublicKey *string `json:"-"`
TargetNode *string `json:"-"`
WorkloadJwt *string `json:"-"`
Location *url.URL `json:"-"`

Errors []error `json:"errors,omitempty"`
}

func (request *DeployRequest) IsEssential() bool {
return request.Essential != nil && *request.Essential
func (info *AgentWorkloadInfo) IsEssential() bool {
return info.Essential != nil && *info.Essential
}

// Returns true if the run request supports essential flag
func (request *DeployRequest) SupportsEssential() bool {
return request.WorkloadType == controlapi.NexWorkloadNative ||
request.WorkloadType == controlapi.NexWorkloadOCI
func (info *AgentWorkloadInfo) SupportsEssential() bool {
return info.WorkloadType == controlapi.NexWorkloadNative ||
info.WorkloadType == controlapi.NexWorkloadOCI
}

// Returns true if the run request supports trigger subjects
func (request *DeployRequest) SupportsTriggerSubjects() bool {
return (request.WorkloadType == controlapi.NexWorkloadV8 ||
request.WorkloadType == controlapi.NexWorkloadWasm) &&
len(request.TriggerSubjects) > 0
func (info *AgentWorkloadInfo) SupportsTriggerSubjects() bool {
return (info.WorkloadType == controlapi.NexWorkloadV8 ||
info.WorkloadType == controlapi.NexWorkloadWasm) &&
len(info.TriggerSubjects) > 0
}

func (r *DeployRequest) Validate() error {
func (r *AgentWorkloadInfo) Validate() error {
var err error

if r.Namespace == nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/node/controlapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ func (api *ApiListener) handleDeploy(ctx context.Context, span trace.Span, m *na
return
}

agentDeployRequest := agentDeployRequestFromControlDeployRequest(&request, namespace, numBytes, *workloadHash)
agentDeployRequest := agentWorkloadInfoFromControlDeployRequest(&request, namespace, numBytes, *workloadHash)

api.log.
Info("Submitting workload to agent",
Expand Down
45 changes: 19 additions & 26 deletions internal/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ func (n *Node) handleAutostarts() {
continue
}

agentDeployRequest := agentDeployRequestFromControlDeployRequest(request, autostart.Namespace, numBytes, *workloadHash)
agentDeployRequest := agentWorkloadInfoFromControlDeployRequest(request, autostart.Namespace, numBytes, *workloadHash)
agentDeployRequest.TotalBytes = int64(numBytes)
agentDeployRequest.Hash = *workloadHash

Expand Down Expand Up @@ -693,30 +693,23 @@ func (n *Node) shuttingDown() bool {
return (atomic.LoadUint32(&n.closing) > 0)
}

// For the curious - I tried a number of ways of making a common/shared deploy request and only modeling the differences
// here, but it made all the code that creates deployment requests look hideous. Will look into cleaning this up again.
func agentDeployRequestFromControlDeployRequest(request *controlapi.DeployRequest, namespace string, numBytes uint64, hash string) *agentapi.DeployRequest {
return &agentapi.DeployRequest{
Argv: request.Argv,
DecodedClaims: request.DecodedClaims,
Description: request.Description,
EncryptedEnvironment: request.Environment,
Environment: request.WorkloadEnvironment,
Essential: request.Essential,
Hash: hash,
HostServicesConfig: request.HostServicesConfig,
ID: request.ID,
JsDomain: request.JsDomain,
Location: request.Location,
Namespace: &namespace,
RetriedAt: request.RetriedAt,
RetryCount: request.RetryCount,
SenderPublicKey: request.SenderPublicKey,
TargetNode: request.TargetNode,
TotalBytes: int64(numBytes),
TriggerSubjects: request.TriggerSubjects,
WorkloadJwt: request.WorkloadJWT,
WorkloadName: request.WorkloadName,
WorkloadType: request.WorkloadType,
func agentWorkloadInfoFromControlDeployRequest(request *controlapi.DeployRequest, namespace string, numBytes uint64, hash string) *agentapi.AgentWorkloadInfo {
return &agentapi.AgentWorkloadInfo{
Argv: request.Argv,
DecodedClaims: request.DecodedClaims,
Description: request.Description,
Environment: request.WorkloadEnvironment,
Essential: request.Essential,
Hash: hash,
HostServicesConfig: request.HostServicesConfig,
ID: request.ID,
Location: request.Location,
Namespace: &namespace,
RetriedAt: request.RetriedAt,
RetryCount: request.RetryCount,
TotalBytes: int64(numBytes),
TriggerSubjects: request.TriggerSubjects,
WorkloadName: request.WorkloadName,
WorkloadType: request.WorkloadType,
}
}
6 changes: 3 additions & 3 deletions internal/node/processmanager/firecracker_procman.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type FirecrackerProcessManager struct {
natsint *internalnats.InternalNatsServer

delegate ProcessDelegate
deployRequests map[string]*agentapi.DeployRequest
deployRequests map[string]*agentapi.AgentWorkloadInfo
}

func NewFirecrackerProcessManager(
Expand All @@ -61,7 +61,7 @@ func NewFirecrackerProcessManager(
allVMs: make(map[string]*runningFirecracker),
poolVMs: make(chan *runningFirecracker, config.MachinePoolSize),
stopMutex: make(map[string]*sync.Mutex),
deployRequests: make(map[string]*agentapi.DeployRequest),
deployRequests: make(map[string]*agentapi.AgentWorkloadInfo),
}, nil
}

Expand Down Expand Up @@ -95,7 +95,7 @@ func (f *FirecrackerProcessManager) EnterLameDuck() error {
}

// Preparing a workload reads from the warmVMs channel
func (f *FirecrackerProcessManager) PrepareWorkload(workloadId string, deployRequest *agentapi.DeployRequest) error {
func (f *FirecrackerProcessManager) PrepareWorkload(workloadId string, deployRequest *agentapi.AgentWorkloadInfo) error {
f.mutex.Lock()
defer f.mutex.Unlock()

Expand Down
4 changes: 2 additions & 2 deletions internal/node/processmanager/process_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const runloopSleepInterval = 100 * time.Millisecond

// Information about an agent process without regard to the implementation of the agent process manager
type ProcessInfo struct {
DeployRequest *agentapi.DeployRequest
DeployRequest *agentapi.AgentWorkloadInfo
ID string
Name string
Namespace string
Expand All @@ -37,7 +37,7 @@ type ProcessManager interface {

// Associate a deploy request with the given workload id, and perform any
// just in time initialization of resources if necessary
PrepareWorkload(id string, request *agentapi.DeployRequest) error
PrepareWorkload(id string, request *agentapi.AgentWorkloadInfo) error

// Start the process manager and allocate a pool of agents based on an implementation-specific
// strategy, delegating callbacks to the given delegate
Expand Down
2 changes: 1 addition & 1 deletion internal/node/processmanager/running_vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type runningFirecracker struct {

closing uint32
config *nexmodels.NodeConfiguration
deployRequest *agentapi.DeployRequest
deployRequest *agentapi.AgentWorkloadInfo
ip net.IP
log *slog.Logger
machine *firecracker.Machine
Expand Down
8 changes: 4 additions & 4 deletions internal/node/processmanager/spawn_procman.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ type SpawningProcessManager struct {
natsint *internalnats.InternalNatsServer

delegate ProcessDelegate
deployRequests map[string]*agentapi.DeployRequest
deployRequests map[string]*agentapi.AgentWorkloadInfo

log *slog.Logger
}

type spawnedProcess struct {
cmd *exec.Cmd
deployRequest *agentapi.DeployRequest
deployRequest *agentapi.AgentWorkloadInfo
workloadStarted time.Time

ID string
Expand Down Expand Up @@ -74,7 +74,7 @@ func NewSpawningProcessManager(

stopMutexes: make(map[string]*sync.Mutex),

deployRequests: make(map[string]*agentapi.DeployRequest),
deployRequests: make(map[string]*agentapi.AgentWorkloadInfo),
allProcs: make(map[string]*spawnedProcess),
poolProcs: make(chan *spawnedProcess, config.MachinePoolSize),
}, nil
Expand Down Expand Up @@ -110,7 +110,7 @@ func (s *SpawningProcessManager) EnterLameDuck() error {
}

// Attaches a deployment request to a running process. Until a process is prepared, it's just an empty agent
func (s *SpawningProcessManager) PrepareWorkload(workloadID string, deployRequest *agentapi.DeployRequest) error {
func (s *SpawningProcessManager) PrepareWorkload(workloadID string, deployRequest *agentapi.AgentWorkloadInfo) error {
s.mutex.Lock()
defer s.mutex.Unlock()

Expand Down
Loading
Loading