Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds block to prevent deploying to un-handshake'd VM, shuts node down if first handshake fails #136

Merged
merged 6 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions internal/node/controlapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,13 @@ func (api *ApiListener) handleDeploy(m *nats.Msg) {
}

runningVM := <-api.mgr.warmVMs
if _, ok := api.mgr.handshakes[runningVM.vmmID]; !ok {
api.log.Error("Attempted to deploy workload into bad VM (no handshake)",
slog.String("vmmid", runningVM.vmmID),
)
respondFail(controlapi.RunResponseType, m, "Could not deploy workload, VM from pool did not initialize properly")
return
}
workloadName := request.DecodedClaims.Subject

api.log.
Expand Down
280 changes: 5 additions & 275 deletions internal/node/machine_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@ import (
"sync/atomic"
"time"

cloudevents "github.com/cloudevents/sdk-go"
"github.com/google/uuid"
"github.com/nats-io/nats.go"
"github.com/nats-io/nkeys"
agentapi "github.com/synadia-io/nex/internal/agent-api"
controlapi "github.com/synadia-io/nex/internal/control-api"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
Expand Down Expand Up @@ -347,6 +345,10 @@ func (m *MachineManager) awaitHandshake(vmid string) {
for !handshakeOk && !m.stopping() {
if time.Now().UTC().After(timeoutAt) {
m.log.Error("Did not receive NATS handshake from agent within timeout.", slog.String("vmid", vmid))
if len(m.handshakes) == 0 {
m.log.Error("First handshake failed, shutting down to avoid inconsistent behavior")
m.cancel()
}
return
}

Expand All @@ -355,133 +357,6 @@ func (m *MachineManager) awaitHandshake(vmid string) {
}
}

// Called when the node server gets a log entry via internal NATS. Used to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kinda liked having all of the MachineManager code in the same place. Maybe it doesn't matter too much. @jordan-rash do you have an opinion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I start to itch when we get over the 500 line mark. While yes it's all under the machine manager umbrella, I like having logically related stuff in nearby locations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I start to itch when we get over the 500 line mark. While yes it's all under the machine manager umbrella, I like having logically related stuff in nearby locations.

Makes sense. It would be better if we created a new interface and struct to make things more readable imho instead of separating it into separate files. I will not die on this hill though.

// package and re-mit with additional metadata on $NEX.logs...
func (m *MachineManager) handleAgentLog(msg *nats.Msg) {
tokens := strings.Split(msg.Subject, ".")
vmID := tokens[1]

vm, ok := m.allVMs[vmID]
if !ok {
m.log.Warn("Received a log message from an unknown VM.")
return
}

var logentry agentapi.LogEntry
err := json.Unmarshal(msg.Data, &logentry)
if err != nil {
m.log.Error("Failed to unmarshal log entry from agent", slog.Any("err", err))
return
}

m.log.Debug("Received agent log", slog.String("vmid", vmID), slog.String("log", logentry.Text))

bytes, err := json.Marshal(&emittedLog{
Text: logentry.Text,
Level: slog.Level(logentry.Level),
MachineId: vmID,
})
if err != nil {
m.log.Error("Failed to marshal our own log entry", slog.Any("err", err))
return
}

var workload *string
if vm.deployRequest != nil {
workload = vm.deployRequest.WorkloadName
}

subject := logPublishSubject(vm.namespace, m.publicKey, vmID, workload)
_ = m.nc.Publish(subject, bytes)
}

// Called when the node server gets an event from the nex agent inside firecracker. The data here is already a fully formed
// cloud event, so all we need to do is unmarshal it, get some metadata, and then republish on $NEX.events...
func (m *MachineManager) handleAgentEvent(msg *nats.Msg) {
// agentint.{vmid}.events.{type}
tokens := strings.Split(msg.Subject, ".")
vmID := tokens[1]

vm, ok := m.allVMs[vmID]
if !ok {
m.log.Warn("Received an event from a VM we don't know about. Rejecting.")
return
}

var evt cloudevents.Event
err := json.Unmarshal(msg.Data, &evt)
if err != nil {
m.log.Error("Failed to deserialize cloudevent from agent", slog.Any("err", err))
return
}

m.log.Info("Received agent event", slog.String("vmid", vmID), slog.String("type", evt.Type()))

err = PublishCloudEvent(m.nc, vm.namespace, evt, m.log)
if err != nil {
m.log.Error("Failed to publish cloudevent", slog.Any("err", err))
return
}

if evt.Type() == agentapi.WorkloadStoppedEventType {
_ = m.StopMachine(vmID, false)

evtData, err := evt.DataBytes()
if err != nil {
m.log.Error("Failed to read cloudevent data", slog.Any("err", err))
return
}

var workloadStatus *agentapi.WorkloadStatusEvent
err = json.Unmarshal(evtData, &workloadStatus)
if err != nil {
m.log.Error("Failed to unmarshal workload status from cloudevent data", slog.Any("err", err))
return
}

if vm.isEssential() && workloadStatus.Code != 0 {
m.log.Debug("Essential workload stopped with non-zero exit code",
slog.String("vmid", vmID),
slog.String("namespace", *vm.deployRequest.Namespace),
slog.String("workload", *vm.deployRequest.WorkloadName),
slog.String("workload_type", *vm.deployRequest.WorkloadType))

if vm.deployRequest.RetryCount == nil {
retryCount := uint(0)
vm.deployRequest.RetryCount = &retryCount
}

*vm.deployRequest.RetryCount += 1

retriedAt := time.Now().UTC()
vm.deployRequest.RetriedAt = &retriedAt

req, _ := json.Marshal(&controlapi.DeployRequest{
Argv: vm.deployRequest.Argv,
Description: vm.deployRequest.Description,
WorkloadType: vm.deployRequest.WorkloadType,
Location: vm.deployRequest.Location,
WorkloadJwt: vm.deployRequest.WorkloadJwt,
Environment: vm.deployRequest.EncryptedEnvironment,
Essential: vm.deployRequest.Essential,
RetriedAt: vm.deployRequest.RetriedAt,
RetryCount: vm.deployRequest.RetryCount,
SenderPublicKey: vm.deployRequest.SenderPublicKey,
TargetNode: vm.deployRequest.TargetNode,
TriggerSubjects: vm.deployRequest.TriggerSubjects,
JsDomain: vm.deployRequest.JsDomain,
})

nodeID, _ := m.kp.PublicKey()
subject := fmt.Sprintf("%s.DEPLOY.%s.%s", controlapi.APIPrefix, vm.namespace, nodeID)
_, err = m.nc.Request(subject, req, time.Millisecond*2500)
if err != nil {
m.log.Error("Failed to redeploy essential workload", slog.Any("err", err))
}
}
}
}

// This handshake uses the request pattern to force a full round trip to ensure connectivity is working properly as
// fire-and-forget publishes from inside the firecracker VM could potentially be lost
func (m *MachineManager) handleHandshake(msg *nats.Msg) {
Expand Down Expand Up @@ -646,141 +521,6 @@ func (m *MachineManager) generateTriggerHandler(vm *runningFirecracker, tsub str
}
}

func (m *MachineManager) publishFunctionExecSucceeded(vm *runningFirecracker, tsub string, elapsedNanos int64) error {
functionExecPassed := struct {
Name string `json:"workload_name"`
Subject string `json:"trigger_subject"`
Elapsed int64 `json:"elapsed_nanos"`
Namespace string `json:"namespace"`
}{
Name: *vm.deployRequest.WorkloadName,
Subject: tsub,
Elapsed: elapsedNanos,
Namespace: vm.namespace,
}

cloudevent := cloudevents.NewEvent()
cloudevent.SetSource(m.publicKey)
cloudevent.SetID(uuid.NewString())
cloudevent.SetTime(time.Now().UTC())
cloudevent.SetType(agentapi.FunctionExecutionSucceededType)
cloudevent.SetDataContentType(cloudevents.ApplicationJSON)
_ = cloudevent.SetData(functionExecPassed)

err := PublishCloudEvent(m.nc, vm.namespace, cloudevent, m.log)
if err != nil {
return err
}

emitLog := emittedLog{
Text: fmt.Sprintf("Function %s execution succeeded (%dns)", functionExecPassed.Name, functionExecPassed.Elapsed),
Level: slog.LevelDebug,
MachineId: vm.vmmID,
}
logBytes, _ := json.Marshal(emitLog)

subject := fmt.Sprintf("%s.%s.%s.%s.%s", LogSubjectPrefix, vm.namespace, m.publicKey, *vm.deployRequest.WorkloadName, vm.vmmID)
err = m.nc.Publish(subject, logBytes)
if err != nil {
m.log.Error("Failed to publish function exec passed log", slog.Any("err", err))
}

return m.nc.Flush()
}

func (m *MachineManager) publishFunctionExecFailed(vm *runningFirecracker, workload string, tsub string, origErr error) error {

functionExecFailed := struct {
Name string `json:"workload_name"`
Subject string `json:"trigger_subject"`
Namespace string `json:"namespace"`
Error string `json:"error"`
}{
Name: workload,
Namespace: vm.namespace,
Subject: tsub,
Error: origErr.Error(),
}

cloudevent := cloudevents.NewEvent()
cloudevent.SetSource(m.publicKey)
cloudevent.SetID(uuid.NewString())
cloudevent.SetTime(time.Now().UTC())
cloudevent.SetType(agentapi.FunctionExecutionFailedType)
cloudevent.SetDataContentType(cloudevents.ApplicationJSON)
_ = cloudevent.SetData(functionExecFailed)

err := PublishCloudEvent(m.nc, vm.namespace, cloudevent, m.log)
if err != nil {
return err
}

emitLog := emittedLog{
Text: "Function execution failed",
Level: slog.LevelError,
MachineId: vm.vmmID,
}
logBytes, _ := json.Marshal(emitLog)

subject := fmt.Sprintf("%s.%s.%s.%s.%s", LogSubjectPrefix, vm.namespace, m.publicKey, *vm.deployRequest.WorkloadName, vm.vmmID)
err = m.nc.Publish(subject, logBytes)
if err != nil {
m.log.Error("Failed to publish function exec failed log", slog.Any("err", err))
}

return m.nc.Flush()
}

// publishMachineStopped writes a workload stopped event for the provided firecracker VM
func (m *MachineManager) publishMachineStopped(vm *runningFirecracker) error {
if vm.deployRequest == nil {
return errors.New("machine stopped event was not published")
}

workloadName := strings.TrimSpace(vm.deployRequest.DecodedClaims.Subject)
if len(workloadName) > 0 {
workloadStopped := struct {
Name string `json:"name"`
Reason string `json:"reason,omitempty"`
VmId string `json:"vmid"`
}{
Name: workloadName,
Reason: "Workload shutdown requested",
VmId: vm.vmmID,
}

cloudevent := cloudevents.NewEvent()
cloudevent.SetSource(m.publicKey)
cloudevent.SetID(uuid.NewString())
cloudevent.SetTime(time.Now().UTC())
cloudevent.SetType(agentapi.WorkloadStoppedEventType)
cloudevent.SetDataContentType(cloudevents.ApplicationJSON)
_ = cloudevent.SetData(workloadStopped)

err := PublishCloudEvent(m.nc, vm.namespace, cloudevent, m.log)
if err != nil {
return err
}

emitLog := emittedLog{
Text: "Workload stopped",
Level: slog.LevelDebug,
MachineId: vm.vmmID,
}
logBytes, _ := json.Marshal(emitLog)

subject := fmt.Sprintf("%s.%s.%s.%s.%s", LogSubjectPrefix, vm.namespace, m.publicKey, workloadName, vm.vmmID)
err = m.nc.Publish(subject, logBytes)
if err != nil {
m.log.Error("Failed to publish machine stopped event", slog.Any("err", err))
}

return m.nc.Flush()
}

return nil
}

func (m *MachineManager) setMetadata(vm *runningFirecracker) error {
return vm.setMetadata(&agentapi.MachineMetadata{
Message: agentapi.StringOrNil("Host-supplied metadata"),
Expand All @@ -793,13 +533,3 @@ func (m *MachineManager) setMetadata(vm *runningFirecracker) error {
func (m *MachineManager) stopping() bool {
return (atomic.LoadUint32(&m.closing) > 0)
}

func logPublishSubject(namespace string, node string, vm string, workload *string) string {
// $NEX.logs.{namespace}.{node}.{vm}[.{workload name}]
subject := fmt.Sprintf("%s.%s.%s.%s", LogSubjectPrefix, namespace, node, vm)
if workload != nil {
subject = fmt.Sprintf("%s.%s", subject, *workload)
}

return subject
}
Loading
Loading