Skip to content

Commit

Permalink
extracts process management into separate interface (#155)
Browse files Browse the repository at this point in the history
* extracts process management into separate interface

* linty

* Cleanup code and audit for feature parity with pre-refactored state

* Fix broken tests

* Refactor agent id -> workload id

Comment out specs which no longer have access to internal data related
to running VMs..

* Rename workloadID -> agentID in AgentClient

* Use workload_id attribute in logs

* Use pool mutex when attempting to deploy workload

* Rename machine id -> id

* Drain subscriptions retained by agent client instances

* Fix linter and defer fix for broken handshake test

The commented test will be fixed in the PR including support for
no-sandbox mode.

* Cleanup string interpolation in slog calls

---------

Co-authored-by: kt <[email protected]>
  • Loading branch information
autodidaddict and kthomas authored Apr 3, 2024
1 parent 65d37a0 commit 3d386f2
Show file tree
Hide file tree
Showing 23 changed files with 1,457 additions and 1,027 deletions.
9 changes: 5 additions & 4 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,13 @@ func (a *Agent) Start() {
// NOTE: the agent process will request a VM shutdown if this fails
func (a *Agent) requestHandshake() error {
msg := agentapi.HandshakeRequest{
MachineID: a.md.VmID,
ID: a.md.VmID,
StartTime: a.started,
Message: a.md.Message,
}
raw, _ := json.Marshal(msg)

resp, err := a.nc.Request(agentapi.NexAgentSubjectHandshake, raw, time.Millisecond*defaultAgentHandshakeTimeoutMillis)
resp, err := a.nc.Request(fmt.Sprintf("agentint.%s.handshake", *a.md.VmID), raw, time.Millisecond*defaultAgentHandshakeTimeoutMillis)
if err != nil {
a.LogError(fmt.Sprintf("Agent failed to request initial sync message: %s", err))
return err
Expand Down Expand Up @@ -247,8 +247,9 @@ func (a *Agent) handleDeploy(m *nats.Msg) {
return
}

if !request.Validate() {
_ = a.workAck(m, false, fmt.Sprintf("%v", request.Errors)) // FIXME-- this message can be formatted prettier
err = request.Validate()
if err != nil {
_ = a.workAck(m, false, fmt.Sprintf("%v", err)) // FIXME-- this message can be formatted prettier
return
}

Expand Down
135 changes: 99 additions & 36 deletions internal/agent-api/client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package agentapi

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand All @@ -12,70 +13,87 @@ import (

cloudevents "github.com/cloudevents/sdk-go"
"github.com/nats-io/nats.go"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
)

type HandshakeCallback func(string)
type EventCallback func(string, cloudevents.Event)
type LogCallback func(string, LogEntry)

const (
nexTriggerSubject = "x-nex-trigger-subject"
)

type AgentClient struct {
nc *nats.Conn
log *slog.Logger
agentId string
agentID string
handshakeTimeout time.Duration
handshakeReceived *atomic.Bool

handshakeTimedOut HandshakeCallback
handshakeSucceeded HandshakeCallback
eventReceived EventCallback
logReceived LogCallback

subz []*nats.Subscription
}

func NewAgentClient(nc *nats.Conn,
func NewAgentClient(
nc *nats.Conn,
log *slog.Logger,
handshakeTimeout time.Duration,
onTimedOut HandshakeCallback,
onSuccess HandshakeCallback,
onEvent EventCallback,
onLog LogCallback,
log *slog.Logger,
) *AgentClient {
shake := &atomic.Bool{}
shake.Store(false)
return &AgentClient{
nc: nc,
eventReceived: onEvent,
handshakeReceived: &atomic.Bool{},
handshakeTimeout: handshakeTimeout,
handshakeTimedOut: onTimedOut,
handshakeSucceeded: onSuccess,
eventReceived: onEvent,
log: log,
logReceived: onLog,

log: log,
handshakeReceived: shake,
nc: nc,
subz: make([]*nats.Subscription, 0),
}
}

func (a *AgentClient) Id() string {
return a.agentId
// Returns the ID of this agent client, which corresponds to a workload process identifier
func (a *AgentClient) ID() string {
return a.agentID
}

func (a *AgentClient) Start(agentId string) error {
a.agentId = agentId
_, err := a.nc.Subscribe("agentint.handshake", a.handleHandshake)
func (a *AgentClient) Start(agentID string) error {
a.log.Info("Agent client starting", slog.String("agent_id", agentID))
a.agentID = agentID

var sub *nats.Subscription
var err error

sub, err = a.nc.Subscribe(fmt.Sprintf("agentint.%s.handshake", agentID), a.handleHandshake)
if err != nil {
return err
}
a.subz = append(a.subz, sub)

_, err = a.nc.Subscribe(fmt.Sprintf("agentint.%s.events.*", agentId), a.handleAgentEvent)
sub, err = a.nc.Subscribe(fmt.Sprintf("agentint.%s.events.*", agentID), a.handleAgentEvent)
if err != nil {
return err
}
a.subz = append(a.subz, sub)

_, err = a.nc.Subscribe(fmt.Sprintf("agentint.%s.logs", agentId), a.handleAgentLog)
sub, err = a.nc.Subscribe(fmt.Sprintf("agentint.%s.logs", agentID), a.handleAgentLog)
if err != nil {
return err
}
a.subz = append(a.subz, sub)

go a.awaitHandshake(agentId)
go a.awaitHandshake(agentID)

return nil
}
Expand All @@ -88,10 +106,10 @@ func (a *AgentClient) DeployWorkload(request *DeployRequest) (*DeployResponse, e

status := a.nc.Status()
a.log.Debug("NATS internal connection status",
slog.String("agentId", a.agentId),
slog.String("agent_id", a.agentID),
slog.String("status", status.String()))

subject := fmt.Sprintf("agentint.%s.deploy", a.agentId)
subject := fmt.Sprintf("agentint.%s.deploy", a.agentID)
resp, err := a.nc.Request(subject, bytes, 1*time.Second)
if err != nil {
if errors.Is(err, os.ErrDeadlineExceeded) {
Expand All @@ -110,33 +128,78 @@ func (a *AgentClient) DeployWorkload(request *DeployRequest) (*DeployResponse, e
return &deployResponse, nil
}

// Stop the agent client instance and cleanup by draining subscriptions
// and releasing other associated resources
func (a *AgentClient) Drain() error {
for _, sub := range a.subz {
err := sub.Drain()
if err != nil {
a.log.Warn("failed to drain subscription associated with agent client",
slog.String("subject", sub.Subject),
slog.String("agent_id", a.agentID),
slog.String("error", err.Error()),
)

// no-op for now, try the next one... perhaps we should return the error here in the future?
}

a.log.Debug("drained subscription associated with agent client",
slog.String("subject", sub.Subject),
slog.String("agent_id", a.agentID),
)
}

return nil
}

func (a *AgentClient) Undeploy() error {
subject := fmt.Sprintf("agentint.%s.undeploy", a.agentId)
subject := fmt.Sprintf("agentint.%s.undeploy", a.agentID)
_, err := a.nc.Request(subject, []byte{}, 500*time.Millisecond) // FIXME-- allow this timeout to be configurable... 500ms is likely not enough
if err != nil {
a.log.Warn("request to undeploy workload via internal NATS connection failed",
slog.String("agentId", a.agentId), slog.String("error", err.Error()))
a.log.Warn("request to undeploy workload via internal NATS connection failed", slog.String("agent_id", a.agentID), slog.String("error", err.Error()))
return err
}
return nil
}

func (a *AgentClient) awaitHandshake(agentId string) {
func (a *AgentClient) RunTrigger(ctx context.Context, tracer trace.Tracer, subject string, data []byte) (*nats.Msg, error) {
intmsg := nats.NewMsg(fmt.Sprintf("agentint.%s.trigger", a.agentID))
// TODO: inject tracer context into message header
intmsg.Header.Add(nexTriggerSubject, subject)
intmsg.Data = data

cctx, childSpan := tracer.Start(
ctx,
"internal request",
trace.WithSpanKind(trace.SpanKindClient),
)

otel.GetTextMapPropagator().Inject(cctx, propagation.HeaderCarrier(intmsg.Header))

// TODO: make the agent's exec handler extract and forward the otel context
// so it continues in the host services like kv, obj, msg, etc
resp, err := a.nc.RequestMsg(intmsg, time.Millisecond*10000) // FIXME-- make timeout configurable
childSpan.End()

return resp, err
}

func (a *AgentClient) awaitHandshake(agentID string) {
<-time.After(a.handshakeTimeout)
if !a.handshakeReceived.Load() {
a.handshakeTimedOut(agentId)
a.handshakeTimedOut(agentID)
}
}

func (a *AgentClient) handleHandshake(msg *nats.Msg) {
var req HandshakeRequest
var req *HandshakeRequest
err := json.Unmarshal(msg.Data, &req)
if err != nil {
a.log.Error("Failed to handle agent handshake", slog.String("agentId", *req.MachineID), slog.String("message", *req.Message))
a.log.Error("Failed to handle agent handshake", slog.String("agent_id", *req.ID), slog.String("message", *req.Message))
return
}

a.log.Info("Received agent handshake", slog.String("agentId", *req.MachineID), slog.String("message", *req.Message))
a.log.Info("Received agent handshake", slog.String("agent_id", *req.ID), slog.String("message", *req.Message))

resp, _ := json.Marshal(&HandshakeResponse{})

Expand All @@ -147,13 +210,13 @@ func (a *AgentClient) handleHandshake(msg *nats.Msg) {
}

a.handshakeReceived.Store(true)
a.handshakeSucceeded(*req.MachineID)
a.handshakeSucceeded(*req.ID)
}

func (a *AgentClient) handleAgentEvent(msg *nats.Msg) {
// agentint.{agentId}.events.{type}
// agentint.{agentID}.events.{type}
tokens := strings.Split(msg.Subject, ".")
agentId := tokens[1]
agentID := tokens[1]

var evt cloudevents.Event
err := json.Unmarshal(msg.Data, &evt)
Expand All @@ -162,13 +225,13 @@ func (a *AgentClient) handleAgentEvent(msg *nats.Msg) {
return
}

a.log.Info("Received agent event", slog.String("agentId", agentId), slog.String("type", evt.Type()))
a.eventReceived(agentId, evt)
a.log.Info("Received agent event", slog.String("agent_id", agentID), slog.String("type", evt.Type()))
a.eventReceived(agentID, evt)
}

func (a *AgentClient) handleAgentLog(msg *nats.Msg) {
tokens := strings.Split(msg.Subject, ".")
agentId := tokens[1]
agentID := tokens[1]

var logentry LogEntry
err := json.Unmarshal(msg.Data, &logentry)
Expand All @@ -177,6 +240,6 @@ func (a *AgentClient) handleAgentLog(msg *nats.Msg) {
return
}

a.log.Debug("Received agent log", slog.String("agentId", agentId), slog.String("log", logentry.Text))
a.logReceived(agentId, logentry)
a.log.Debug("Received agent log", slog.String("agent_id", agentID), slog.String("log", logentry.Text))
a.logReceived(agentID, logentry)
}
13 changes: 7 additions & 6 deletions internal/agent-api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@ import (
"github.com/nats-io/nats.go"
)

// Agent handshake subject
const NexAgentSubjectHandshake = "agentint.handshake"

// Executable Linkable Format execution provider
const NexExecutionProviderELF = "elf"

Expand Down Expand Up @@ -87,6 +84,10 @@ type DeployRequest struct {
Errors []error `json:"errors,omitempty"`
}

func (request *DeployRequest) IsEssential() bool {
return request.Essential != nil && *request.Essential
}

// Returns true if the run request supports essential flag
func (request *DeployRequest) SupportsEssential() bool {
return strings.EqualFold(*request.WorkloadType, "elf") ||
Expand All @@ -100,7 +101,7 @@ func (request *DeployRequest) SupportsTriggerSubjects() bool {
len(request.TriggerSubjects) > 0
}

func (r *DeployRequest) Validate() bool {
func (r *DeployRequest) Validate() error {
var err error

if r.WorkloadName == nil {
Expand All @@ -127,7 +128,7 @@ func (r *DeployRequest) Validate() bool {
err = errors.Join(err, errors.New("at least one trigger subject is required for this workload type"))
}

return err == nil
return err
}

type DeployResponse struct {
Expand All @@ -136,7 +137,7 @@ type DeployResponse struct {
}

type HandshakeRequest struct {
MachineID *string `json:"machine_id"`
ID *string `json:"id"`
StartTime time.Time `json:"start_time"`
Message *string `json:"message,omitempty"`
}
Expand Down
8 changes: 4 additions & 4 deletions internal/control-api/stop.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ type StopRequest struct {
}

type StopResponse struct {
Stopped bool `json:"stopped"`
MachineId string `json:"machine_id"`
Issuer string `json:"issuer"`
Name string `json:"name"`
Stopped bool `json:"stopped"`
ID string `json:"id"`
Issuer string `json:"issuer"`
Name string `json:"name"`
}

func NewStopRequest(workloadId string, name string, targetNode string, issuer nkeys.KeyPair) (*StopRequest, error) {
Expand Down
23 changes: 12 additions & 11 deletions internal/control-api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ const (
)

type RunResponse struct {
Started bool `json:"started"`
MachineId string `json:"machine_id"`
Issuer string `json:"issuer"`
Name string `json:"name"`
Started bool `json:"started"`
ID string `json:"id"`
Issuer string `json:"issuer"`
Name string `json:"name"`
}

type PingResponse struct {
Expand Down Expand Up @@ -52,10 +52,11 @@ type InfoResponse struct {
}

type MachineSummary struct {
Id string `json:"id"`
Healthy bool `json:"healthy"`
Uptime string `json:"uptime"`
Workload WorkloadSummary `json:"workload,omitempty"`
Id string `json:"id"`
Healthy bool `json:"healthy"`
Uptime string `json:"uptime"`
Namespace string `json:"namespace,omitempty"`
Workload WorkloadSummary `json:"workload,omitempty"`
}

type WorkloadSummary struct {
Expand All @@ -82,9 +83,9 @@ type EmittedLog struct {
}

type RawLog struct {
Text string `json:"text"`
Level slog.Level `json:"level"`
MachineId string `json:"machine_id"`
Text string `json:"text"`
Level slog.Level `json:"level"`
ID string `json:"id"`
}

// Note this a wrapper to add context to a cloud event
Expand Down
1 change: 1 addition & 0 deletions internal/node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type NodeConfiguration struct {
BinPath []string `json:"bin_path"`
CNI CNIDefinition `json:"cni"`
DefaultResourceDir string `json:"default_resource_dir"`
NoSandbox bool `json:"no_sandbox,omitempty"`
ForceDepInstall bool `json:"-"`
InternalNodeHost *string `json:"internal_node_host,omitempty"`
InternalNodePort *int `json:"internal_node_port"`
Expand Down
Loading

0 comments on commit 3d386f2

Please sign in to comment.