Skip to content

Commit

Permalink
will the conference WiFi work?? or will it fail?? NO ONE CAN POSSIBLY…
Browse files Browse the repository at this point in the history
… KNOW.
  • Loading branch information
autodidaddict committed Nov 8, 2023
1 parent 1650425 commit 31d3d1a
Show file tree
Hide file tree
Showing 14 changed files with 136 additions and 49 deletions.
9 changes: 8 additions & 1 deletion Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,15 @@ tasks:
cmds:
- go build -tags netgo -ldflags '-extldflags "-static"'

echo-service:
dir: examples/echoservice
sources:
- "*.go"
cmds:
- go build -tags netgo -ldflags '-extldflags "-static"'

gen:
deps: [agent-api]

build:
deps: [nex-agent, nex-node, nex-cli]
deps: [nex-agent, nex-node, nex-cli, echo-service]
9 changes: 5 additions & 4 deletions agent-api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (c AgentClient) Health() (bool, string, error) {
return health.Healthy, health.AgentVersion, nil
}

func (c AgentClient) PostWorkload(workloadPath string) (*WorkloadAck, error) {
func (c AgentClient) PostWorkload(workloadPath string, env map[string]string) (*WorkloadAck, error) {
f, err := os.Open(workloadPath)
if err != nil {
return nil, err
Expand All @@ -114,9 +114,10 @@ func (c AgentClient) PostWorkload(workloadPath string) (*WorkloadAck, error) {
err = stream.Send(&Workload{
ChunkType: &Workload_Header{
Header: &WorkloadMetadata{
Hash: fmt.Sprintf("%x", h.Sum(nil)),
Name: "bob",
TotalBytes: int32(info.Size()),
Hash: fmt.Sprintf("%x", h.Sum(nil)),
RuntimeEnvironment: env,
Name: workloadPath,
TotalBytes: int32(info.Size()),
},
},
})
Expand Down
24 changes: 17 additions & 7 deletions control-api/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func NewRunRequest(opts ...RequestOption) (*RunRequest, error) {
WorkloadJwt: workloadJwt,
Environment: encryptedEnv,
SenderPublicKey: senderPublic,
TargetNode: reqOpts.targetPublicXKey,
TargetNode: reqOpts.targetNode,
JsDomain: reqOpts.jsDomain,
}

Expand All @@ -66,29 +66,30 @@ func NewRunRequest(opts ...RequestOption) (*RunRequest, error) {

// This will validate a request's workload JWT, decrypt the request environment. It will not
// perform a comparison of the hash found in the claims with a recipient's expected hash
func (request *RunRequest) Validate(myKey nkeys.KeyPair) error {
func (request *RunRequest) Validate(myKey nkeys.KeyPair) (*jwt.GenericClaims, error) {
fmt.Printf("%v", request)
claims, err := jwt.DecodeGeneric(request.WorkloadJwt)
if err != nil {
return err
return nil, fmt.Errorf("could not decode workload JWT: %s", err)
}
request.DecodedClaims = *claims
fmt.Printf("decodeed claims %+v\n", request.DecodedClaims)
if !validWorkloadName.MatchString(claims.Subject) {
return fmt.Errorf("workload name claim ('%s') does not match requirements of all lowercase letters", claims.Subject)
return nil, fmt.Errorf("workload name claim ('%s') does not match requirements of all lowercase letters", claims.Subject)
}

err = request.DecryptRequestEnvironment(myKey)
if err != nil {
return err
return nil, fmt.Errorf("failed to decrypt request environment: %s", err)
}

var vr jwt.ValidationResults
claims.Validate(&vr)
if len(vr.Issues) > 0 || len(vr.Errors()) > 0 {
return errors.New("standard claims within JWT are not valid")
return nil, errors.New("standard claims within JWT are not valid")
}

return nil
return claims, nil

}

Expand Down Expand Up @@ -137,6 +138,7 @@ type requestOptions struct {
targetPublicXKey string
jsDomain string
hash string
targetNode string
}

type RequestOption func(o requestOptions) requestOptions
Expand All @@ -149,6 +151,14 @@ func WorkloadName(name string) RequestOption {
}
}

// Sets the target execution engine node (a public key of type "server") for this requet
func TargetNode(publicKey string) RequestOption {
return func(o requestOptions) requestOptions {
o.targetNode = publicKey
return o
}
}

// Location of the workload. For files in NATS object stores, use nats://BUCKET/key
func Location(fileUrl string) RequestOption {
return func(o requestOptions) requestOptions {
Expand Down
4 changes: 2 additions & 2 deletions control-api/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ func TestEncryption(t *testing.T) {
SenderXKey(myKey),
Issuer(issuerAccount),
Location("nats://MUHBUCKET/muhfile"),
TargetPublicKey(recipientPk),
TargetPublicXKey(recipientPk),
)

request.DecryptRequestEnvironment(recipientKey)
if request.WorkloadEnvironment["TOP_SECRET_LUGGAGE"] != "12345" {
t.Fatalf("Expected a good luggage password, found %s", request.WorkloadEnvironment["TOP_SECRET_LUGGAGE"])
}

err := request.Validate(recipientKey)
_, err := request.Validate(recipientKey)
if err != nil {
t.Fatalf("Expected no error, but got one: %s", err)
}
Expand Down
Binary file added examples/echoservice/echoservice
Binary file not shown.
47 changes: 47 additions & 0 deletions examples/echoservice/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package main

import (
"context"
"fmt"
"os"
"strings"

"github.com/nats-io/nats.go"
services "github.com/nats-io/nats.go/micro"
)

func main() {
ctx := context.Background()

natsUrl := os.Getenv("NATS_URL")
if len(strings.TrimSpace(natsUrl)) == 0 {
natsUrl = nats.DefaultURL
}
fmt.Printf("Using NATS url '%s'", natsUrl)
nc, err := nats.Connect(natsUrl)
if err != nil {
panic(err)
}

// request handler
echoHandler := func(req services.Request) {
req.Respond(req.Data())
}

_, err = services.AddService(nc, services.Config{
Name: "EchoService",
Version: "1.0.0",
// base handler
Endpoint: &services.EndpointConfig{
Subject: "svc.echo",
Handler: services.HandlerFunc(echoHandler),
},
})

if err != nil {
panic(err)
}

<-ctx.Done()

}
5 changes: 0 additions & 5 deletions nex-agent/api_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,6 @@ func (api *AgentApiServer) SubmitWorkload(stream agentapi.NexAgent_SubmitWorkloa
if err != nil {
return stream.SendAndClose(&agentapi.WorkloadAck{Error: true, Message: fmt.Sprintf("Failed to write file chunk: %s", err)})
}
agentLogs <- &agentapi.LogEntry{
Source: "nex-agent",
Level: agentapi.LogLevel_LEVEL_DEBUG,
Text: fmt.Sprintf("Received workload chunk %d bytes", len(chunk.Chunk)),
}
totalBytesReceived += len(chunk.Chunk)
case *agentapi.Workload_Header:
agentLogs <- &agentapi.LogEntry{
Expand Down
7 changes: 7 additions & 0 deletions nex-cli/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package nexcli
import (
"fmt"
"os"
"strings"

controlapi "github.com/ConnectEverything/nex/control-api"
"github.com/choria-io/fisk"
Expand Down Expand Up @@ -51,6 +52,12 @@ func renderNodeInfo(info *controlapi.InfoResponse, id string) {
cols.AddRow("Version", info.Version)
cols.AddRow("Uptime", info.Uptime)

taglist := make([]string, 0)
for k, v := range info.Tags {
taglist = append(taglist, fmt.Sprintf("%s=%s", k, v))
}
cols.AddRow("Tags", strings.Join(taglist, ", "))

cols.AddSectionTitle("Workloads:")
cols.Indent(2)
for _, m := range info.Machines {
Expand Down
7 changes: 3 additions & 4 deletions nex-cli/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
)

func RunWorkload(ctx *fisk.ParseContext) error {
fmt.Printf("%v\n", Opts)
fmt.Printf("%v\n", RunOpts)
nc, err := generateConnectionFromOpts()
if err != nil {
return err
Expand All @@ -21,7 +19,6 @@ func RunWorkload(ctx *fisk.ParseContext) error {
// Get node info so we can get public xkey from the target for env encryption
nodeInfo, err := nodeClient.NodeInfo(RunOpts.TargetNode)
if err != nil {
fmt.Println("SUCK")
return err
}

Expand Down Expand Up @@ -51,8 +48,10 @@ func RunWorkload(ctx *fisk.ParseContext) error {
controlapi.Environment(RunOpts.Env),
controlapi.Issuer(issuerKp),
controlapi.SenderXKey(xkey),
controlapi.TargetPublicXKey(RunOpts.TargetNode),
controlapi.TargetNode(RunOpts.TargetNode),
controlapi.TargetPublicXKey(targetPublicXkey),
controlapi.WorkloadName(RunOpts.Name),
controlapi.Checksum("abc12345TODOmakethisreal"),
controlapi.WorkloadDescription(RunOpts.Description),
)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion nex-node/cmd/ac/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func main() {

fmt.Println("Submitting workload")

ack, err := ac.PostWorkload("/home/kevin/lab/firecracker/testworkload/workload")
ack, err := ac.PostWorkload("/home/kevin/lab/firecracker/testworkload/workload", make(map[string]string))
if err != nil {
fmt.Println(err)
panic(err)
Expand Down
40 changes: 32 additions & 8 deletions nex-node/controlapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package nexnode
import (
"encoding/json"
"fmt"
"os"
"runtime"
"strconv"
"time"
Expand All @@ -13,6 +14,10 @@ import (
"github.com/sirupsen/logrus"
)

var (
workloadSpecs = make(map[string]controlapi.RunRequest)
)

type ApiListener struct {
mgr *MachineManager
nc *nats.Conn
Expand Down Expand Up @@ -44,13 +49,19 @@ func NewApiListener(nc *nats.Conn, log *logrus.Logger, mgr *MachineManager, tags

log.WithField("public_xkey", xkPub).Info("Use this key as the recipient for encrypted run requests")

dname, err := os.MkdirTemp("", "nexnode")
if err != nil {
log.WithError(err).Error("Failed to create temp directory for workload cache")
return nil
}

return &ApiListener{
mgr: mgr,
nc: nc,
log: log,
nodeId: pub,
xk: kp,
payloadCache: NewPayloadCache(nc, log, "."),
payloadCache: NewPayloadCache(nc, log, dname),
start: time.Now().UTC(),
tags: tags,
}
Expand Down Expand Up @@ -80,12 +91,14 @@ func handleRun(api *ApiListener) func(m *nats.Msg) {
}

// If this passes, `DecodedClaims` contains values and WorkloadEnvironment is decrypted
err = request.Validate(api.xk)
decodedClaims, err := request.Validate(api.xk)
if err != nil {
api.log.WithError(err).Error("Invalid run request")
respondFail(controlapi.RunResponseType, m, fmt.Sprintf("Invalid run request: %s", err))
return
}
fmt.Printf("POST VALIDATION: %+v\n", request)
request.DecodedClaims = *decodedClaims

// TODO: once we support another location, change this to "GetPayload"
payloadFile, err := api.payloadCache.GetPayloadFromBucket(&request)
Expand All @@ -100,23 +113,33 @@ func handleRun(api *ApiListener) func(m *nats.Msg) {
return
}

err = runningVm.SubmitWorkload(payloadFile, request)
// NOTE this makes a copy of request
_, err = runningVm.agentClient.PostWorkload(payloadFile.Name(), request.WorkloadEnvironment)
if err != nil {
return
}
runningVm.workloadStarted = time.Now().UTC()
runningVm.workloadSpecification = request

if err != nil {
api.log.WithError(err).Error("Failed to start workload in VM")
}
fmt.Printf("NEW workload spec claims %+v\n", runningVm.workloadSpecification.DecodedClaims)

res := controlapi.NewEnvelope(controlapi.RunResponseType, controlapi.RunResponse{
Started: true,
Name: request.DecodedClaims.Subject,
Issuer: request.DecodedClaims.Issuer,
Name: runningVm.workloadSpecification.DecodedClaims.Name,
Issuer: runningVm.workloadSpecification.DecodedClaims.Issuer,
MachineId: runningVm.vmmID,
}, nil)
fmt.Printf("%+v\n", res)
raw, err := json.Marshal(res)
if err != nil {
api.log.WithError(err).Error("Failed to marshal ping response")
} else {
m.Respond(raw)
}
api.mgr.allVms[runningVm.vmmID] = *runningVm
}
}

Expand Down Expand Up @@ -162,19 +185,20 @@ func handleInfo(api *ApiListener) func(m *nats.Msg) {
}

func summarizeMachines(vms *map[string]runningFirecracker) []controlapi.MachineSummary {
machines := make([]controlapi.MachineSummary, len(*vms))
machines := make([]controlapi.MachineSummary, 0)
now := time.Now().UTC()
for _, v := range *vms {
fmt.Printf("VM: %+v\n", v)
machine := controlapi.MachineSummary{
Id: v.vmmID,
Healthy: true, // TODO cache last health status
Uptime: myUptime(now.Sub(v.machineStarted)),
Workload: controlapi.WorkloadSummary{
Name: v.workloadSpecification.DecodedClaims.Name,
Name: v.workloadSpecification.DecodedClaims.Subject,
Description: v.workloadSpecification.Description,
Runtime: myUptime(now.Sub(v.workloadStarted)),
WorkloadType: v.workloadSpecification.WorkloadType,
Hash: v.workloadSpecification.DecodedClaims.Data["hash"].(string),
//Hash: v.workloadSpecification.DecodedClaims.Data["hash"].(string),
},
}

Expand Down
1 change: 1 addition & 0 deletions nex-node/machine_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func (m *MachineManager) PublicKey() string {

func (m *MachineManager) TakeFromPool() (*runningFirecracker, error) {
running := <-m.warmVms
m.allVms[running.vmmID] = running
return &running, nil
}

Expand Down
Loading

0 comments on commit 31d3d1a

Please sign in to comment.