Skip to content

Commit

Permalink
Improve stability (#328)
Browse files Browse the repository at this point in the history
1. **Connection and Logging Fixes:**
   - Fixed connection name for triggers.
   - Fixed mismatched logger values.
   - Updated logger values and added trace levels.
   - Improved logging around autostart artifact resolution.

2. **Agent and Workload Improvements:**
   - Closed signal channel when shutting down the agent.
   - Ensured agent provider undeploy method is called only once.
   - Added logging for object download time.
   - Removed `--name` parameter from workload stop requests.
   - Improved agent deploy request retries and auto-start configuration handling.
   - Stopped failing stats read if `/proc/meminfo` is missing.
   - Fixed host services configuration test.

3. **Configuration and Utility Enhancements:**
   - Updated UUID usage.
   - Fixed internal configuration template.
   - Enabled shared namespace bucket.
   - Added essential fields in workload deployed status events.
   - Added `allow_duplicate_workloads` tag to node configuration.
   - Omitted empty `allow_duplicate_workloads` flag in control API structs.
   - Added `SanitizeNATSDigest` utility function.

4. **Miscellaneous Updates:**
   - Updated comments and code clean-up.
   - Renamed `slog` handler module.
   - Added nil protection around artifact deletion.
   - Refactored node sandbox specs to match Linux specs.
   - Renamed `process_mgr_nofc.go` to `process_mgr_nosandbox.go`.
   - Updated upstream Firecracker SDK and handled security findings.
   - Cleaned up whitespace and fixed test specifications.

5. **Bug Fixes and Reversions:**
   - Removed force preflight.
   - Fixed build issues and made function signature consistent across success/failure methods.
   - Published workload undeploy events before purging cached deploy requests.
   - Stopped running workload on agent connection loss.
  • Loading branch information
jordan-rash authored Jul 31, 2024
1 parent c90e457 commit a3d8b74
Show file tree
Hide file tree
Showing 49 changed files with 886 additions and 636 deletions.
4 changes: 4 additions & 0 deletions .goreleaser.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ builds:
goarch:
- amd64
- arm64
flags:
- -trimpath
ldflags:
- -s -w --X main.VERSION={{.Version}} -X main.COMMIT={{.Commit}} -X main.BUILDDATE={{.Date}} -X github.com/synadia-io/nex/internal/node.VERSION={{.Version}} -X github.com/synadia-io/nex/internal/node.COMMIT={{.Commit}} -X github.com/synadia-io/nex/internal/node.BUILDDATE={{.Date}}
- -extldflags "-static"
Expand All @@ -32,6 +34,8 @@ builds:
goarch:
- amd64
- arm64
flags:
- -trimpath
ldflags:
- -s -w --X github.com/synadia-io/nex/agent.VERSION={{.Version}} -X github.com/synadia-io/nex/agent.COMMIT={{.Commit}} -X github.com/synadia-io/nex/agent.BUILDDATE={{.Date}}
- -extldflags "-static"
Expand Down
89 changes: 54 additions & 35 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"log/slog"
"os"
"os/signal"
"path"
Expand All @@ -30,7 +31,7 @@ const (
defaultAgentHandshakeTimeoutMillis = 500
runloopSleepInterval = 250 * time.Millisecond
runloopTickInterval = 2500 * time.Millisecond
workloadExecutionSleepTimeoutMillis = 100
workloadExecutionSleepTimeoutMillis = 50
)

// Agent facilitates communication between the nex agent running in the firecracker VM
Expand All @@ -53,7 +54,8 @@ type Agent struct {
nc *nats.Conn
started time.Time

sandboxed bool
sandboxed bool
undeploying *atomic.Bool
}

// Initialize a new agent to facilitate communications with the host
Expand All @@ -80,12 +82,13 @@ func NewAgent(ctx context.Context, cancelF context.CancelFunc) (*Agent, error) {
agentLogs: make(chan *agentapi.LogEntry, 64),
eventLogs: make(chan *cloudevents.Event, 64),
// sandbox defaults to true, only way to override that is with an explicit 'false'
cancelF: cancelF,
ctx: ctx,
sandboxed: isSandboxed(),
md: metadata,
started: time.Now().UTC(),
subz: make([]*nats.Subscription, 0),
cancelF: cancelF,
ctx: ctx,
sandboxed: isSandboxed(),
md: metadata,
started: time.Now().UTC(),
subz: make([]*nats.Subscription, 0),
undeploying: &atomic.Bool{},
}, nil
}

Expand All @@ -97,13 +100,13 @@ func (a *Agent) FullVersion() string {
// NOTE: agent process will request vm shutdown if this fails
func (a *Agent) Start() {
if !a.sandboxed {
a.LogDebug(fmt.Sprintf("Agent process running outside of sandbox; pid: %d", os.Getpid()))
a.submitLog(fmt.Sprintf("Agent process running outside of sandbox; pid: %d", os.Getpid()), slog.LevelDebug)
}

err := a.init()
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to initialize agent: %s\n", err)
a.LogError(fmt.Sprintf("Agent process failed to initialize; %s", err.Error()))
a.submitLog(fmt.Sprintf("Agent process failed to initialize; %s", err.Error()), slog.LevelError)
a.shutdown()
}

Expand All @@ -115,7 +118,7 @@ func (a *Agent) Start() {
case <-timer.C:
// TODO: check NATS subscription statuses, etc.
case sig := <-a.sigs:
a.LogInfo(fmt.Sprintf("Received signal: %s", sig))
a.submitLog(fmt.Sprintf("Received signal: %s", sig), slog.LevelInfo)
a.shutdown()
case <-a.ctx.Done():
a.shutdown()
Expand All @@ -130,7 +133,7 @@ func (a *Agent) Start() {
// Request a handshake with the host indicating the agent is "all the way" up
// NOTE: the agent process will request a VM shutdown if this fails
func (a *Agent) requestHandshake() error {
a.LogInfo("Requesting handshake from host")
a.submitLog("Requesting handshake from host", slog.LevelDebug)
msg := agentapi.HandshakeRequest{
ID: a.md.VmID,
StartTime: a.started,
Expand All @@ -144,20 +147,20 @@ func (a *Agent) requestHandshake() error {

resp, err := a.nc.Request(fmt.Sprintf("hostint.%s.handshake", *a.md.VmID), raw, time.Millisecond*defaultAgentHandshakeTimeoutMillis)
if err != nil {
a.LogError(fmt.Sprintf("Agent failed to request initial sync message: %s, attempt %d", err, attempts+1))
a.submitLog(fmt.Sprintf("Agent failed to request initial sync message: %s, attempt %d", err, attempts+1), slog.LevelError)
time.Sleep(time.Millisecond * 25)
continue
}

var handshakeResponse *agentapi.HandshakeResponse
err = json.Unmarshal(resp.Data, &handshakeResponse)
if err != nil {
a.LogError(fmt.Sprintf("Failed to parse handshake response: %s", err))
a.submitLog(fmt.Sprintf("Failed to parse handshake response: %s", err), slog.LevelError)
time.Sleep(time.Millisecond * 25)
continue
}

a.LogInfo(fmt.Sprintf("Agent is up after %d attempt(s)", attempts))
a.submitLog(fmt.Sprintf("Agent is up after %d attempt(s)", attempts), slog.LevelInfo)
return nil
}

Expand All @@ -183,14 +186,14 @@ func (a *Agent) cacheExecutableArtifact(req *agentapi.DeployRequest) (*string, e
err := a.cacheBucket.GetFile(*a.md.VmID, tempFile)
if err != nil {
msg := fmt.Sprintf("Failed to get and write workload artifact to temp dir: %s", err)
a.LogError(msg)
a.submitLog(msg, slog.LevelError)
return nil, errors.New(msg)
}

err = os.Chmod(tempFile, 0777)
if err != nil {
msg := fmt.Sprintf("Failed to set workload artifact as executable: %s", err)
a.LogError(msg)
a.submitLog(msg, slog.LevelError)
return nil, errors.New(msg)
}

Expand All @@ -204,7 +207,10 @@ func (a *Agent) deleteExecutableArtifact() error {
tempFile := path.Join(os.TempDir(), fileName)

_ = os.Remove(tempFile)
_ = a.cacheBucket.Delete(*a.md.VmID)

if a.cacheBucket != nil {
_ = a.cacheBucket.Delete(*a.md.VmID)
}

return nil
}
Expand Down Expand Up @@ -258,7 +264,7 @@ func (a *Agent) handleDeploy(m *nats.Msg) {
err := json.Unmarshal(m.Data, &request)
if err != nil {
msg := fmt.Sprintf("Failed to unmarshal deploy request: %s", err)
a.LogError(msg)
a.submitLog(msg, slog.LevelError)
_ = a.workAck(m, false, msg)
return
}
Expand All @@ -284,7 +290,7 @@ func (a *Agent) handleDeploy(m *nats.Msg) {
provider, err := providers.NewExecutionProvider(params)
if err != nil {
msg := fmt.Sprintf("Failed to initialize workload execution provider; %s", err)
a.LogError(msg)
a.submitLog(msg, slog.LevelError)
_ = a.workAck(m, false, msg)
return
}
Expand All @@ -299,32 +305,39 @@ func (a *Agent) handleDeploy(m *nats.Msg) {
err = a.provider.Validate()
if err != nil {
msg := fmt.Sprintf("Failed to validate workload: %s", err)
a.LogError(msg)
a.submitLog(msg, slog.LevelError)
_ = a.workAck(m, false, msg)
return
}
}

err = a.provider.Deploy()
if err != nil {
a.LogError(fmt.Sprintf("Failed to deploy workload: %s", err))
a.submitLog(fmt.Sprintf("Failed to deploy workload: %s", err), slog.LevelError)
} else {
_ = a.workAck(m, true, "Workload deployed")
}
}

func (a *Agent) handleUndeploy(m *nats.Msg) {
if a.provider == nil {
a.LogDebug("Received undeploy workload request on agent without deployed workload")
a.submitLog("Received undeploy workload request on agent without deployed workload", slog.LevelDebug)
_ = m.Respond([]byte{})
return
}

if a.undeploying.Load() {
a.submitLog("Received additional undeploy workload request on agent", slog.LevelWarn)
return
}

a.undeploying.Store(true)

err := a.provider.Undeploy()
if err != nil {
// don't return an error here so worst-case scenario is an ungraceful shutdown,
// not a failure
a.LogError(fmt.Sprintf("Failed to undeploy workload: %s", err))
a.submitLog(fmt.Sprintf("Failed to undeploy workload: %s", err), slog.LevelError)
}

_ = m.Respond([]byte{})
Expand Down Expand Up @@ -352,36 +365,36 @@ func (a *Agent) init() error {

err := a.initNATS()
if err != nil {
a.LogError(fmt.Sprintf("Failed to initialize NATS connection: %s", err))
a.submitLog(fmt.Sprintf("Failed to initialize NATS connection: %s", err), slog.LevelError)
return err
}

err = a.requestHandshake()
if err != nil {
a.LogError(fmt.Sprintf("Failed to handshake with node: %s", err))
a.submitLog(fmt.Sprintf("Failed to handshake with node: %s", err), slog.LevelError)
return err
}

subject := fmt.Sprintf("agentint.%s.deploy", *a.md.VmID)
sub, err := a.nc.Subscribe(subject, a.handleDeploy)
if err != nil {
a.LogError(fmt.Sprintf("Failed to subscribe to agent deploy subject: %s", err))
a.submitLog(fmt.Sprintf("Failed to subscribe to agent deploy subject: %s", err), slog.LevelError)
return err
}
a.subz = append(a.subz, sub)

udsubject := fmt.Sprintf("agentint.%s.undeploy", *a.md.VmID)
sub, err = a.nc.Subscribe(udsubject, a.handleUndeploy)
if err != nil {
a.LogError(fmt.Sprintf("Failed to subscribe to agent undeploy subject: %s", err))
a.submitLog(fmt.Sprintf("Failed to subscribe to agent undeploy subject: %s", err), slog.LevelError)
return err
}
a.subz = append(a.subz, sub)

pingSubject := fmt.Sprintf("agentint.%s.ping", *a.md.VmID)
sub, err = a.nc.Subscribe(pingSubject, a.handlePing)
if err != nil {
a.LogError(fmt.Sprintf("failed to subscribe to ping subject: %s", err))
a.submitLog(fmt.Sprintf("failed to subscribe to ping subject: %s", err), slog.LevelError)
}
a.subz = append(a.subz, sub)

Expand Down Expand Up @@ -476,11 +489,14 @@ func (a *Agent) newExecutionProviderParams(req *agentapi.DeployRequest, tmpFile
msg := fmt.Sprintf("Failed to start workload: %s; vm: %s", *params.WorkloadName, params.VmID)
a.PublishWorkloadExited(params.VmID, *params.WorkloadName, msg, true, -1)
return

case <-params.Run:
a.PublishWorkloadDeployed(params.VmID, *params.WorkloadName, params.TotalBytes)
sleepMillis = workloadExecutionSleepTimeoutMillis
essential := false
if params.Essential != nil {
essential = *params.Essential
}

a.PublishWorkloadDeployed(params.VmID, *params.WorkloadName, essential, params.TotalBytes)
sleepMillis = workloadExecutionSleepTimeoutMillis
case exit := <-params.Exit:
msg := fmt.Sprintf("Exited workload: %s; vm: %s; status: %d", *params.WorkloadName, params.VmID, exit)
a.PublishWorkloadExited(params.VmID, *params.WorkloadName, msg, exit != 0, exit)
Expand Down Expand Up @@ -511,13 +527,16 @@ func (a *Agent) shutdown() {
}
}

if a.provider != nil {
if a.provider != nil && !a.undeploying.Load() {
err := a.provider.Undeploy()
if err != nil {
fmt.Printf("failed to undeploy workload: %s\n", err)
}
}

signal.Stop(a.sigs)
close(a.sigs)

HaltVM(nil)
}
}
Expand All @@ -526,7 +545,7 @@ func (a *Agent) shuttingDown() bool {
return (atomic.LoadUint32(&a.closing) > 0)
}

func (a *Agent) submitLog(msg string, lvl agentapi.LogLevel) {
func (a *Agent) submitLog(msg string, lvl slog.Level) {
a.agentLogs <- &agentapi.LogEntry{
Source: NexEventSourceNexAgent,
Level: lvl,
Expand All @@ -549,7 +568,7 @@ func (a *Agent) workAck(m *nats.Msg, accepted bool, msg string) error {

err = m.Respond(bytes)
if err != nil {
a.LogError(fmt.Sprintf("Failed to acknowledge workload deployment: %s", err))
a.submitLog(fmt.Sprintf("Failed to acknowledge workload deployment: %s", err), slog.LevelError)
return err
}

Expand Down
50 changes: 23 additions & 27 deletions agent/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package nexagent

import (
"fmt"
"os"
"log/slog"

agentapi "github.com/synadia-io/nex/internal/agent-api"
)
Expand All @@ -20,7 +20,7 @@ type logEmitter struct {

// Write arbitrary bytes to the underlying log emitter
func (l *logEmitter) Write(bytes []byte) (int, error) {
var lvl agentapi.LogLevel
var lvl slog.Level
if l.stderr {
lvl = agentapi.LogLevelError
} else {
Expand All @@ -37,36 +37,24 @@ func (l *logEmitter) Write(bytes []byte) (int, error) {
return len(bytes), nil
}

func (a *Agent) LogDebug(msg string) {
fmt.Fprintln(os.Stdout, msg)
if a.sandboxed {
a.submitLog(msg, agentapi.LogLevelDebug)
}
}

func (a *Agent) LogError(msg string) {
fmt.Fprintln(os.Stderr, msg)
if a.sandboxed {
a.submitLog(msg, agentapi.LogLevelError)
}
}

func (a *Agent) LogInfo(msg string) {
fmt.Fprintln(os.Stdout, msg)
if a.sandboxed {
a.submitLog(msg, agentapi.LogLevelInfo)
}
}

// FIXME-- revisit error handling
func (a *Agent) PublishWorkloadDeployed(vmID, workloadName string, totalBytes int64) {
func (a *Agent) PublishWorkloadDeployed(vmID, workloadName string, essential bool, totalBytes int64) {
a.agentLogs <- &agentapi.LogEntry{
Source: NexEventSourceNexAgent,
Level: agentapi.LogLevelInfo,
Text: fmt.Sprintf("Workload %s deployed", workloadName),
}

evt := agentapi.NewAgentEvent(vmID, agentapi.WorkloadDeployedEventType, agentapi.WorkloadStatusEvent{WorkloadName: workloadName})
evt := agentapi.NewAgentEvent(
vmID,
agentapi.WorkloadDeployedEventType,
agentapi.WorkloadStatusEvent{
Essential: &essential,
TotalBytes: &totalBytes,
WorkloadID: vmID,
WorkloadName: workloadName,
},
)
a.eventLogs <- &evt
}

Expand All @@ -86,10 +74,18 @@ func (a *Agent) PublishWorkloadExited(vmID, workloadName, message string, err bo

a.agentLogs <- &agentapi.LogEntry{
Source: NexEventSourceNexAgent,
Level: agentapi.LogLevel(level),
Level: level,
Text: txt,
}

evt := agentapi.NewAgentEvent(vmID, agentapi.WorkloadUndeployedEventType, agentapi.WorkloadStatusEvent{WorkloadName: workloadName, Code: code, Message: message})
evt := agentapi.NewAgentEvent(vmID,
agentapi.WorkloadUndeployedEventType,
agentapi.WorkloadStatusEvent{
WorkloadID: vmID,
WorkloadName: workloadName,
Code: code,
Message: message,
},
)
a.eventLogs <- &evt
}
Loading

0 comments on commit a3d8b74

Please sign in to comment.