Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adds instrumentation to handling deploy request #303

Merged
merged 3 commits into from
Jul 1, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 40 additions & 3 deletions internal/node/controlapi.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nexnode

import (
"context"
"encoding/json"
"fmt"
"log/slog"
Expand All @@ -15,6 +16,9 @@ import (
"github.com/nats-io/nkeys"
"github.com/pkg/errors"
controlapi "github.com/synadia-io/nex/control-api"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)

// The API listener is the command and control interface for the node server
Expand Down Expand Up @@ -218,47 +222,62 @@ func (api *ApiListener) handleAuction(m *nats.Msg) {
}

func (api *ApiListener) handleDeploy(m *nats.Msg) {
tracer := api.node.telemetry.Tracer
ctx := context.Background()
_, span := tracer.Start(ctx, "Handling deployment request",
trace.WithSpanKind(trace.SpanKindServer))
defer span.End()

namespace, err := extractNamespace(m.Subject)
if err != nil {
span.SetStatus(codes.Error, err.Error())
api.log.Error("Invalid subject for workload deployment", slog.Any("err", err))
respondFail(controlapi.RunResponseType, m, "Invalid subject for workload deployment")
return
}
span.SetAttributes(attribute.String("namespace", namespace))

if api.node.IsLameDuck() {
span.SetStatus(codes.Error, "Node is in lame duck mode, rejected deploy request")
respondFail(controlapi.RunResponseType, m, "Node is in lame duck mode. Workload deploy request denied")
return
}

if api.exceedsMaxWorkloadCount() {
span.SetStatus(codes.Error, "Node is at maximum workload limit, rejected deploy request")
respondFail(controlapi.RunResponseType, m, "Node is at maximum workload limit. Deploy request denied")
return
}

var request controlapi.DeployRequest
err = json.Unmarshal(m.Data, &request)
if err != nil {
span.SetStatus(codes.Error, err.Error())
api.log.Error("Failed to deserialize deploy request", slog.Any("err", err))
respondFail(controlapi.RunResponseType, m, fmt.Sprintf("Unable to deserialize deploy request: %s", err))
return
}

if !slices.Contains(api.node.config.WorkloadTypes, request.WorkloadType) {
span.SetStatus(codes.Error, "Unsupported workload type")
api.log.Error("This node does not support the given workload type", slog.String("workload_type", string(request.WorkloadType)))
respondFail(controlapi.RunResponseType, m, fmt.Sprintf("Unsupported workload type on this node: %s", string(request.WorkloadType)))
return
}

if len(request.TriggerSubjects) > 0 && (request.WorkloadType != controlapi.NexWorkloadV8 &&
request.WorkloadType != controlapi.NexWorkloadWasm) { // FIXME -- workload type comparison
span.SetStatus(codes.Error, "Unsupported workload type for trigger subjects")
api.log.Error("Workload type does not support trigger subject registration", slog.String("trigger_subjects", string(request.WorkloadType)))
respondFail(controlapi.RunResponseType, m, fmt.Sprintf("Unsupported workload type for trigger subject registration: %s", string(request.WorkloadType)))
return
}
span.SetAttributes(attribute.String("trigger_subjects", strings.Join(request.TriggerSubjects, ",")))

if len(request.TriggerSubjects) > 0 && len(api.node.config.DenyTriggerSubjects) > 0 {
for _, subject := range request.TriggerSubjects {
if inDenyList(subject, api.node.config.DenyTriggerSubjects) {
span.SetStatus(codes.Error, "Trigger subject space overlaps with blocked subjects")
respondFail(controlapi.RunResponseType, m,
fmt.Sprintf("The trigger subject %s overlaps with subject(s) in this node's deny list", subject))
return
Expand All @@ -269,13 +288,16 @@ func (api *ApiListener) handleDeploy(m *nats.Msg) {
err = request.DecryptRequestEnvironment(api.xk)
if err != nil {
publicKey, _ := api.xk.PublicKey()
span.SetStatus(codes.Error, err.Error())
api.log.Error("Failed to decrypt environment for deploy request", slog.String("public_key", publicKey), slog.Any("err", err))
respondFail(controlapi.RunResponseType, m, fmt.Sprintf("Failed to decrypt environment for deploy request: %s", err))
return
}
span.SetAttributes(attribute.String("workload_name", request.DecodedClaims.Subject))

decodedClaims, err := request.Validate()
if err != nil {
span.SetStatus(codes.Error, err.Error())
api.log.Error("Invalid deploy request", slog.Any("err", err))
respondFail(controlapi.RunResponseType, m, fmt.Sprintf("Invalid deploy request: %s", err))
return
Expand All @@ -284,63 +306,76 @@ func (api *ApiListener) handleDeploy(m *nats.Msg) {
request.DecodedClaims = *decodedClaims
if !validateIssuer(request.DecodedClaims.Issuer, api.node.config.ValidIssuers) {
err := fmt.Errorf("invalid workload issuer: %s", request.DecodedClaims.Issuer)
span.SetStatus(codes.Error, err.Error())
api.log.Error("Workload validation failed", slog.Any("err", err))
respondFail(controlapi.RunResponseType, m, fmt.Sprintf("%s", err))
return
}

agentClient, err := api.mgr.SelectRandomAgent()
if err != nil {
span.SetStatus(codes.Error, err.Error())
api.log.Error("Failed to get agent client from pool", slog.Any("err", err))
respondFail(controlapi.RunResponseType, m, fmt.Sprintf("Failed to get agent client from pool: %s", err))
return
}

workloadID := agentClient.ID()
span.SetAttributes(attribute.String("workload_id", workloadID))

span.AddEvent("Started workload download")
numBytes, workloadHash, err := api.mgr.CacheWorkload(workloadID, &request)
if err != nil {
span.SetStatus(codes.Error, err.Error())
api.log.Error("Failed to cache workload bytes", slog.Any("err", err))
respondFail(controlapi.RunResponseType, m, fmt.Sprintf("Failed to cache workload bytes: %s", err))
return
}
span.AddEvent("Completed workload download")
if api.exceedsMaxWorkloadSize(numBytes) {
span.SetStatus(codes.Error, "Workload file size is too big")
respondFail(controlapi.RunResponseType, m, "Workload file size would exceed node limitations")
return
}
if api.exceedsPerNodeWorkloadSizeMax(numBytes) {
span.SetStatus(codes.Error, "Workload file size exceeds node limitations")
respondFail(controlapi.RunResponseType, m, "Workload file size would exceed node total limitations")
return
}

deployRequest := agentDeployRequestFromControlDeployRequest(&request, namespace, numBytes, *workloadHash)
agentDeployRequest := agentDeployRequestFromControlDeployRequest(&request, namespace, numBytes, *workloadHash)

api.log.
Info("Submitting workload to agent",
slog.String("namespace", namespace),
slog.String("workload", *deployRequest.WorkloadName),
slog.String("workload", *agentDeployRequest.WorkloadName),
slog.Uint64("workload_size", numBytes),
slog.String("workload_sha256", *workloadHash),
slog.String("type", string(request.WorkloadType)),
)

err = api.mgr.DeployWorkload(agentClient, deployRequest)
span.AddEvent("Created agent deploy request")
err = api.mgr.DeployWorkload(agentClient, agentDeployRequest)
if err != nil {
span.SetStatus(codes.Error, err.Error())
api.log.Error("Failed to deploy workload",
slog.String("error", err.Error()),
)
respondFail(controlapi.RunResponseType, m, fmt.Sprintf("Failed to deploy workload: %s", err))
return
}
span.AddEvent("Agent deploy request accepted")

if _, ok := api.mgr.handshakes[workloadID]; !ok {
span.SetStatus(codes.Error, "Tried to deploy into non-handshaked agent")
api.log.Error("Attempted to deploy workload into bad process (no handshake)",
slog.String("workload_id", workloadID),
)
respondFail(controlapi.RunResponseType, m, "Could not deploy workload, agent pool did not initialize properly")
return
}
workloadName := request.DecodedClaims.Subject
span.SetAttributes(attribute.String("workload_name", workloadName))

api.log.Info("Workload deployed", slog.String("workload", workloadName), slog.String("workload_id", workloadID))

Expand All @@ -353,8 +388,10 @@ func (api *ApiListener) handleDeploy(m *nats.Msg) {

raw, err := json.Marshal(res)
if err != nil {
span.SetStatus(codes.Error, err.Error())
api.log.Error("Failed to marshal deploy response", slog.Any("err", err))
} else {
span.SetStatus(codes.Ok, "Workload deployed")
_ = m.Respond(raw)
}
}
Expand Down
Loading