Skip to content

Commit

Permalink
tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
autodidaddict committed Nov 30, 2023
1 parent 10bbfe1 commit d0a42d2
Show file tree
Hide file tree
Showing 8 changed files with 28 additions and 19 deletions.
12 changes: 0 additions & 12 deletions agent-api/README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,2 @@
# Agent API
This is the API used for communication between the agent (process running inside the firecracker VM) and the host (`nex-node`). This API contains operations to subscribe to logs and events, as well as health query and, of course, a function to start and run a workload.

Most of this package is generated from protobufs, but there is also a client that can be used to communicate with `nex-agent`s.

# Why gRPC?
It's worth pointing out that end user developers will never see or interact with this protocol. It's _only_ to be used as internal comms between a node host and the agent inside firecracker. The only people exposed to the use of gRPC here would be contributors.

There are a number of requirements that drove the gRPC decision.

* **Workload delivery** - We need to send a workload into a running firecracker instance. It's already running at the time of workload assignment because we're keeping it warm. We can either send over network, or we can do it through block device mount. For the latter, we'd have to fill an empty file, format it as `ext4`, mount it, write the workload binary to it, unmount it, and then submit the drive to the machine. Since firecracker machines can't add drives while running, using the drive mount option means we can't keep microVMs warm. Firecracker cannot mount shared directories (unless we start doing hybrid shenanigans with docker).
* **Security & Multi-tenancy** - Having the agent be a service hosting an endpoint on its privately-assigned firecracker VM allows us to do multi-tenancy without the need for complex NATS account and credential management. If the node were to either embed or start a NATS server specifically for allowing communication with agents, we would need to manage accounts and user credentials for each firecracker VM/workload and that adds more potential failure paths than I'm comfortable with, not to mention the complexity burden on developers. We could also do it with specific users, but one slipup could expose private data to the wrong tenant.
* **Size** - We need the individual VMs to be as small as humanly possible. We could start a NATS server inside each VM and use it instead of gRPC to communicate with the agent inside. However, if we do this, we're inurring (at least) a **20MB** RAM penalty _per firecracker VM_. This won't scale (**4GB** of RAM per **255** workloads).
* **Isolation** - The agent and node host need to be able to communicate with each other and continue operation during network partition events. If the agent and node both use a "remote" NATS server as their means for communication, not only would they be incurring the at-least-1-hop latency penalty, but neither would be able to talk to each other when disconnected from the remote server/cluster.
1 change: 1 addition & 0 deletions agent-api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type WorkRequest struct {
Hash string `json:"hash"`
TotalBytes int `json:"total_bytes"`
Environment map[string]string `json:"environment"`
WorkloadType string `json:"workload_type,omitempty"`
}

type WorkResponse struct {
Expand Down
4 changes: 2 additions & 2 deletions control-api/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ type AgentStartedEvent struct {
}

type WorkloadStartedEvent struct {
Name string `json:"name"`
Name string `json:"workload_name"`
TotalBytes int `json:"total_bytes"`
}

type WorkloadStoppedEvent struct {
Name string `json:"name"`
Name string `json:"workload_name"`
Code int `json:"code"`
Message string `json:"message"`
}
Expand Down
7 changes: 7 additions & 0 deletions nex-agent/nodeclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ var (

const (
AdvertiseSubject = "agentint.advertise"
MyWorkloadType = "elf"
)

type NodeClient struct {
Expand Down Expand Up @@ -85,6 +86,12 @@ func handleWorkDispatched(node *NodeClient) func(m *nats.Msg) {
workAck(m, false, msg)
return
}
if request.WorkloadType != MyWorkloadType {
// silently do nothing, allowing other potential agents that do
// handle that workload to respond
LogDebug(fmt.Sprintf("Ignoring request to start workload of type '%s'", request.WorkloadType))
return
}

tempFile := path.Join(os.TempDir(), "workload")
err = node.cacheBucket.GetFile(request.WorkloadName, tempFile)
Expand Down
8 changes: 7 additions & 1 deletion nex-cli/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,12 @@ func WatchEvents(ctx *fisk.ParseContext) error {
systemSub := fmt.Sprintf("%s.events.system.*", controlapi.APIPrefix) // capture events that come from nodes themselves, e.g. system namespace
_, err = nc.Subscribe(subscribeSubject, handleEventEntry(log))
if err != nil {
log.WithField("namespace_filter", namespaceFilter).WithError(err).Error("Failed to subscribe to namespace events")
return err
}
_, err = nc.Subscribe(systemSub, handleEventEntry(log))
if err != nil {
return err
log.Warn("Failed to subscribe to system namespace. Node events will be unavailable")
}

nctx := context.Background()
Expand Down Expand Up @@ -118,6 +119,8 @@ func handleEventEntry(log *logrus.Logger) func(m *nats.Msg) {

// TODO: There's likely something we can do to simplify/automate this with reflection or maybe codegen
entry := log.WithField("namespace", namespace).WithField("event_type", eventType).WithFields(event.Extensions())

// Extract meaningful fields from well-known events
switch eventType {
case controlapi.AgentStartedEventType:
evt := &controlapi.AgentStartedEvent{}
Expand Down Expand Up @@ -185,6 +188,9 @@ func handleLogEntry(log *logrus.Logger) func(m *nats.Msg) {
log.WithError(err).Error("Log entry deserialization failure")
return
}
if logEntry.Level == 0 {
logEntry.Level = logrus.DebugLevel
}

log.WithField("namespace", namespace).
WithField("node", node).
Expand Down
2 changes: 2 additions & 0 deletions nex-node/cmd/nex-node/main.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//go:build linux

package main

import (
Expand Down
2 changes: 1 addition & 1 deletion nex-node/controlapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func handleRun(api *ApiListener) func(m *nats.Msg) {

if err != nil {
api.log.WithError(err).Error("Failed to start workload in VM")
respondFail(controlapi.RunResponseType, m, fmt.Sprintf("Unable to submit workload to agent process: %s", err))
respondFail(controlapi.RunResponseType, m, fmt.Sprintf("Unable to start workload: %s", err))
return
}
api.log.WithField("workload", workloadName).WithField("vmid", runningVm.vmmID).Info("Work accepted")
Expand Down
11 changes: 8 additions & 3 deletions nex-node/machine_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,23 +89,28 @@ func (m *MachineManager) DispatchWork(vm *runningFirecracker, workloadName strin
req := agentapi.WorkRequest{
WorkloadName: workloadName,
Hash: "",
TotalBytes: 0,
TotalBytes: 0, // TODO: make real
WorkloadType: request.WorkloadType,
Environment: request.WorkloadEnvironment,
}
bytes, _ := json.Marshal(req)

subject := fmt.Sprintf("agentint.%s.workdispatch", vm.vmmID)
resp, err := m.ncInternal.Request(subject, bytes, 1*time.Second)
if err != nil {
return err
if errors.Is(err, os.ErrDeadlineExceeded) {
return errors.New("timed out waiting for acknowledgement of work dispatch")
} else {
return fmt.Errorf("failed to submit request for work: %s", err)
}
}
var workResponse agentapi.WorkResponse
err = json.Unmarshal(resp.Data, &workResponse)
if err != nil {
return err
}
if !workResponse.Accepted {
return fmt.Errorf("workload not accepted by agent: %s", workResponse.Message)
return fmt.Errorf("workload rejected by agent: %s", workResponse.Message)
}

vm.workloadStarted = time.Now().UTC()
Expand Down

0 comments on commit d0a42d2

Please sign in to comment.