Skip to content

Commit

Permalink
Preliminary support for account security in nats internal server (#243)
Browse files Browse the repository at this point in the history
  • Loading branch information
autodidaddict authored Jun 5, 2024
1 parent 0186a22 commit 69da829
Show file tree
Hide file tree
Showing 28 changed files with 1,078 additions and 349 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,7 @@ test/panicker/panicker
pnats
.idea

_spec
_spec

/internal/node/internal-nats/pnats
/nex/pnats
106 changes: 69 additions & 37 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@ import (

"github.com/cloudevents/sdk-go/pkg/cloudevents"
"github.com/nats-io/nats.go"
"github.com/nats-io/nkeys"
"github.com/synadia-io/nex/agent/providers"
agentapi "github.com/synadia-io/nex/internal/agent-api"
"github.com/synadia-io/nex/internal/models"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
)

const defaultAgentHandshakeTimeoutMillis = 500
const runloopSleepInterval = 250 * time.Millisecond
const runloopTickInterval = 2500 * time.Millisecond
const workloadExecutionSleepTimeoutMillis = 1000
const (
defaultAgentHandshakeTimeoutMillis = 500
runloopSleepInterval = 250 * time.Millisecond
runloopTickInterval = 2500 * time.Millisecond
workloadExecutionSleepTimeoutMillis = 1000
workloadCacheFileKey = "workload"
)

// Agent facilitates communication between the nex agent running in the firecracker VM
// and the nex node by way of a configured internal NATS server. Agent instances provide
Expand Down Expand Up @@ -61,7 +65,7 @@ func NewAgent(ctx context.Context, cancelF context.CancelFunc) (*Agent, error) {
metadata, err = GetMachineMetadata()
}
if err != nil {
fmt.Fprintf(os.Stderr, "failed to get machien metadata: %s\n", err)
fmt.Fprintf(os.Stderr, "failed to get machine metadata: %s\n", err)
return nil, fmt.Errorf("failed to get machine metadata: %s", err)
}

Expand All @@ -70,35 +74,15 @@ func NewAgent(ctx context.Context, cancelF context.CancelFunc) (*Agent, error) {
return nil, fmt.Errorf("invalid metadata: %v", metadata.Errors)
}

nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", *metadata.NodeNatsHost, *metadata.NodeNatsPort))
if err != nil {
fmt.Fprintf(os.Stderr, "failed to connect to shared NATS: %s", err)
return nil, err
}

js, err := nc.JetStream()
if err != nil {
fmt.Fprintf(os.Stderr, "failed to get JetStream context from shared NATS: %s", err)
return nil, err
}

bucket, err := js.ObjectStore(agentapi.WorkloadCacheBucket)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to get reference to shared object store: %s", err)
return nil, err
}

return &Agent{
agentLogs: make(chan *agentapi.LogEntry, 64),
eventLogs: make(chan *cloudevents.Event, 64),
// sandbox defaults to true, only way to override that is with an explicit 'false'
cancelF: cancelF,
ctx: ctx,
sandboxed: isSandboxed(),
cacheBucket: bucket,
md: metadata,
nc: nc,
started: time.Now().UTC(),
cancelF: cancelF,
ctx: ctx,
sandboxed: isSandboxed(),
md: metadata,
started: time.Now().UTC(),
}, nil
}

Expand Down Expand Up @@ -151,11 +135,11 @@ func (a *Agent) requestHandshake() error {
}
raw, _ := json.Marshal(msg)

resp, err := a.nc.Request(fmt.Sprintf("agentint.%s.handshake", *a.md.VmID), raw, time.Millisecond*defaultAgentHandshakeTimeoutMillis)
resp, err := a.nc.Request(fmt.Sprintf("hostint.%s.handshake", *a.md.VmID), raw, time.Millisecond*defaultAgentHandshakeTimeoutMillis)
if err != nil {
if errors.Is(err, nats.ErrNoResponders) {
time.Sleep(time.Millisecond * 50)
resp, err = a.nc.Request(fmt.Sprintf("agentint.%s.handshake", *a.md.VmID), raw, time.Millisecond*defaultAgentHandshakeTimeoutMillis)
resp, err = a.nc.Request(fmt.Sprintf("hostint.%s.handshake", *a.md.VmID), raw, time.Millisecond*defaultAgentHandshakeTimeoutMillis)
}

if err != nil {
Expand Down Expand Up @@ -191,9 +175,9 @@ func (a *Agent) cacheExecutableArtifact(req *agentapi.DeployRequest) (*string, e
tempFile = fmt.Sprintf("%s.exe", tempFile)
}

err := a.cacheBucket.GetFile(*req.WorkloadName, tempFile)
err := a.cacheBucket.GetFile(workloadCacheFileKey, tempFile)
if err != nil {
msg := fmt.Sprintf("Failed to write workload artifact to temp dir: %s", err)
msg := fmt.Sprintf("Failed to get and write workload artifact to temp dir: %s", err)
a.LogError(msg)
return nil, errors.New(msg)
}
Expand All @@ -218,7 +202,7 @@ func (a *Agent) dispatchEvents() {
continue
}

subject := fmt.Sprintf("agentint.%s.events.%s", *a.md.VmID, entry.Type())
subject := fmt.Sprintf("hostint.%s.events.%s", *a.md.VmID, entry.Type())
err = a.nc.Publish(subject, bytes)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to publish event: %s", err.Error())
Expand All @@ -239,7 +223,7 @@ func (a *Agent) dispatchLogs() {
continue
}

subject := fmt.Sprintf("agentint.%s.logs", *a.md.VmID)
subject := fmt.Sprintf("hostint.%s.logs", *a.md.VmID)
err = a.nc.Publish(subject, bytes)
if err != nil {
continue
Expand Down Expand Up @@ -330,9 +314,17 @@ func (a *Agent) handleUndeploy(m *nats.Msg) {
}

func (a *Agent) handlePing(m *nats.Msg) {
// a.LogDebug(fmt.Sprintf("received ping on subject: %s", m.Subject))
_ = m.Respond([]byte("OK"))
}

// Agent instances subscribe to the following `agentint.>` subjects,
// which are exported dynamically by each `<agent_id>` account on the
// configured internal NATS connection for consumption by the nex node:
//
// - agentint.<agent_id>.deploy
// - agentint.<agent_id>.undeploy
// - agentint.<agent_id>.ping
func (a *Agent) init() error {
a.installSignalHandlers()

Expand All @@ -341,7 +333,13 @@ func (a *Agent) init() error {
propagation.Baggage{},
))

err := a.requestHandshake()
err := a.initNATS()
if err != nil {
a.LogError(fmt.Sprintf("Failed to initialize NATS connection: %s", err))
return err
}

err = a.requestHandshake()
if err != nil {
a.LogError(fmt.Sprintf("Failed to handshake with node: %s", err))
return err
Expand Down Expand Up @@ -373,6 +371,40 @@ func (a *Agent) init() error {
return nil
}

func (a *Agent) initNATS() error {
url := fmt.Sprintf("nats://%s:%d", *a.md.NodeNatsHost, *a.md.NodeNatsPort)
pair, err := nkeys.FromSeed([]byte(*a.md.NodeNatsNkeySeed))
if err != nil {
fmt.Fprintf(os.Stderr, "invalid nkey seed: %v\n", *a.md.NodeNatsNkeySeed)
return fmt.Errorf("invalid nkey seed: %v", *a.md.NodeNatsNkeySeed)
}

pk, _ := pair.PublicKey()
a.nc, err = nats.Connect(url, nats.Nkey(pk, func(b []byte) ([]byte, error) {
fmt.Fprintf(os.Stdout, "Attempting to sign NATS server nonce for internal NATS connection; public key: %s", pk)
return pair.Sign(b)
}))
if err != nil {
fmt.Fprintf(os.Stderr, "failed to connect to shared NATS: %s", err)
return err
}
fmt.Printf("Connected to internal NATS: %s\n", url)

js, err := a.nc.JetStream()
if err != nil {
fmt.Fprintf(os.Stderr, "failed to get JetStream context from shared NATS: %s", err)
return err
}

a.cacheBucket, err = js.ObjectStore(agentapi.WorkloadCacheBucket)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to get reference to shared object store: %s", err)
return err
}

return nil
}

func (a *Agent) installSignalHandlers() {
signal.Reset(syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)
resetSIGUSR()
Expand Down
11 changes: 7 additions & 4 deletions agent/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const nexEnvSandbox = "NEX_SANDBOX"
const nexEnvWorkloadID = "NEX_WORKLOADID"
const nexEnvNodeNatsHost = "NEX_NODE_NATS_HOST"
const nexEnvNodeNatsPort = "NEX_NODE_NATS_PORT"
const nexEnvNodeNatsSeed = "NEX_NODE_NATS_NKEY_SEED"

const metadataClientTimeoutMillis = 50
const metadataPollingTimeoutMillis = 5000
Expand Down Expand Up @@ -71,6 +72,7 @@ func GetMachineMetadataFromEnv() (*agentapi.MachineMetadata, error) {
vmid := os.Getenv(nexEnvWorkloadID)
host := os.Getenv(nexEnvNodeNatsHost)
port := os.Getenv(nexEnvNodeNatsPort)
seed := os.Getenv(nexEnvNodeNatsSeed)
msg := "Metadata obtained from no-sandbox environment"
p, err := strconv.Atoi(port)
if err != nil {
Expand All @@ -79,10 +81,11 @@ func GetMachineMetadataFromEnv() (*agentapi.MachineMetadata, error) {
}

return &agentapi.MachineMetadata{
VmID: &vmid,
NodeNatsHost: &host,
NodeNatsPort: &p,
Message: &msg,
VmID: &vmid,
NodeNatsHost: &host,
NodeNatsPort: &p,
NodeNatsNkeySeed: &seed,
Message: &msg,
}, nil
}

Expand Down
5 changes: 4 additions & 1 deletion agent/providers/lib/v8.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ func (v *V8) Execute(ctx context.Context, payload []byte) ([]byte, error) {
return
}

_, _ = v.stdout.Write([]byte(fmt.Sprintf("calling js function via trigger subject: %s", subject)))
val, err = fn.Call(v8ctx.Global(), argv1, argv2)
if err != nil {
errs <- err
Expand Down Expand Up @@ -543,6 +544,8 @@ func (v *V8) newMessagingObjectTemplate(ctx context.Context) *v8.ObjectTemplate
messaging := v8.NewObjectTemplate(v.iso)

_ = messaging.Set(hostServicesMessagingPublishFunctionName, v8.NewFunctionTemplate(v.iso, func(info *v8.FunctionCallbackInfo) *v8.Value {
_, _ = v.stdout.Write([]byte(fmt.Sprintf("attempting to publish msg via %s", v.nc.Servers()[0])))

args := info.Args()
if len(args) != 2 {
val, _ := v8.NewValue(v.iso, "subject and payload are required")
Expand Down Expand Up @@ -847,7 +850,7 @@ func InitNexExecutionProviderV8(params *agentapi.ExecutionProviderParams) (*V8,

hsclient := hostservices.NewHostServicesClient(
params.NATSConn,
time.Second*2,
time.Second*5, // FIXME-- make configurable
*params.Namespace,
*params.WorkloadName,
params.VmID,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
github.com/nats-io/nats.go v1.34.1
github.com/nats-io/natscli v0.1.4
github.com/nats-io/nkeys v0.4.7
github.com/nats-io/nuid v1.0.1
github.com/onsi/ginkgo/v2 v2.17.1
github.com/onsi/gomega v1.32.0
github.com/pkg/errors v0.9.1
Expand Down Expand Up @@ -109,7 +110,6 @@ require (
github.com/muesli/cancelreader v0.2.2 // indirect
github.com/muesli/reflow v0.3.0 // indirect
github.com/muesli/termenv v0.15.2 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.2 // indirect
Expand Down
5 changes: 3 additions & 2 deletions host-services/builtins/keyvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ func (k *KeyValueService) HandleRequest(
method string,
workloadName string,
metadata map[string]string,
request []byte) (hostservices.ServiceResult, error) {

request []byte,
) (hostservices.ServiceResult, error) {
switch method {
case kvServiceMethodGet:
return k.handleGet(workloadId, workloadName, request, metadata, namespace)
Expand Down Expand Up @@ -208,5 +208,6 @@ func (k *KeyValueService) resolveKeyValueStore(namespace, workload string) (nats
}
}

k.log.Debug("Resolved key/value store for KV host service", slog.String("name", kvStoreName), slog.String("bucket", kvStore.Bucket()))
return kvStore, nil
}
2 changes: 1 addition & 1 deletion host-services/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func NewHostServicesClient(nc *nats.Conn, timeout time.Duration, namespace, work
}

func (c *HostServicesClient) PerformRPC(ctx context.Context, service string, method string, payload []byte, metadata map[string]string) (ServiceResult, error) {
subject := fmt.Sprintf("agentint.%s.rpc.%s.%s.%s.%s",
subject := fmt.Sprintf("hostint.%s.rpc.%s.%s.%s.%s",
c.workloadId,
c.namespace,
c.workloadName,
Expand Down
25 changes: 16 additions & 9 deletions host-services/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@ import (
)

type HostServicesServer struct {
log *slog.Logger
ncInternal *nats.Conn
services map[string]HostService
tracer trace.Tracer
log *slog.Logger
nc *nats.Conn
services map[string]HostService
tracer trace.Tracer
}

func NewHostServicesServer(nc *nats.Conn, log *slog.Logger, tracer trace.Tracer) *HostServicesServer {
return &HostServicesServer{
ncInternal: nc,
log: log,
services: make(map[string]HostService),
tracer: tracer,
log: log,
nc: nc,
services: make(map[string]HostService),
tracer: tracer,
}
}

Expand All @@ -49,12 +49,19 @@ func (h *HostServicesServer) AddService(name string, svc HostService, config jso
return nil
}

// Host services server instances subscribe to the following `hostint.>` subjects,
// which are exported by the `nexnode` account on the configured internal
// NATS connection for consumption by agents:
//
// - hostint.<agent_id>.rpc.<namespace>.<workloadName>.<service>.<method>
func (h *HostServicesServer) Start() error {
_, err := h.ncInternal.Subscribe("agentint.*.rpc.*.*.*.*", h.handleRPC)
_, err := h.nc.Subscribe("hostint.*.rpc.*.*.*.*", h.handleRPC)
if err != nil {
h.log.Warn("Failed to create Host services rpc subscription", slog.String("error", err.Error()))
return err
}

h.log.Debug("Host services rpc subscription created", slog.String("address", h.nc.ConnectedAddr()))
return nil
}

Expand Down
7 changes: 5 additions & 2 deletions host-services/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,13 @@ func ServiceResultPass(code uint, message string, data []byte) ServiceResult {

type HostService interface {
Initialize(json.RawMessage) error
HandleRequest(namespace string,

HandleRequest(
namespace string,
workloadId string,
method string,
workloadName string,
metadata map[string]string,
request []byte) (ServiceResult, error)
request []byte,
) (ServiceResult, error)
}
Loading

0 comments on commit 69da829

Please sign in to comment.