diff --git a/internal/agent-api/types.go b/internal/agent-api/types.go index 71df34b7..dc5a4cbb 100644 --- a/internal/agent-api/types.go +++ b/internal/agent-api/types.go @@ -1,6 +1,7 @@ package agentapi import ( + "encoding/json" "errors" "io" "strings" @@ -119,6 +120,11 @@ type HandshakeRequest struct { Message *string `json:"message,omitempty"` } +type HostServicesKeyValueRequest struct { + Key *string `json:"key"` + Value *json.RawMessage `json:"value,omitempty"` +} + type MachineMetadata struct { VmID *string `json:"vmid"` NodeNatsHost *string `json:"node_nats_host"` diff --git a/internal/node/machine_mgr.go b/internal/node/machine_mgr.go index 5ecadba7..48084c02 100644 --- a/internal/node/machine_mgr.go +++ b/internal/node/machine_mgr.go @@ -52,6 +52,8 @@ type MachineManager struct { handshakes map[string]string handshakeTimeout time.Duration // TODO: make configurable... + hostServices *HostServices + natsStoreDir string publicKey string } @@ -104,6 +106,13 @@ func NewMachineManager( return nil, err } + m.hostServices = NewHostServices(m, m.nc, m.ncInternal, m.log) + err = m.hostServices.init() + if err != nil { + m.log.Warn("Failed to initialize host services.", slog.Any("err", err)) + return nil, err + } + return m, nil } diff --git a/internal/node/services.go b/internal/node/services.go new file mode 100644 index 00000000..e77629aa --- /dev/null +++ b/internal/node/services.go @@ -0,0 +1,141 @@ +package nexnode + +import ( + "encoding/json" + "fmt" + "log/slog" + "strings" + + "github.com/nats-io/nats.go" + "github.com/synadia-io/nex/internal/node/services" + hostservices "github.com/synadia-io/nex/internal/node/services/lib" +) + +const hostServiceHTTP = "http" +const hostServiceKeyValue = "kv" +const hostServiceMessaging = "messaging" +const hostServiceObjectStore = "objectstore" + +// Host services server implements select functionality which is +// exposed to workloads by way of the agent which makes RPC calls +// via the internal NATS connection +type HostServices struct { + log *slog.Logger + mgr *MachineManager + nc *nats.Conn + ncint *nats.Conn + + http services.HostService + kv services.HostService + messaging services.HostService + object services.HostService +} + +func NewHostServices(mgr *MachineManager, nc, ncint *nats.Conn, log *slog.Logger) *HostServices { + return &HostServices{ + log: log, + mgr: mgr, + nc: nc, + ncint: ncint, + } +} + +func (h *HostServices) init() error { + var err error + + h.http, err = hostservices.NewHTTPService(h.nc, h.log) + if err != nil { + h.log.Error(fmt.Sprintf("failed to initialize http host service: %s", err.Error())) + return err + } else { + h.log.Debug("initialized http host service") + } + + h.kv, err = hostservices.NewKeyValueService(h.nc, h.log) + if err != nil { + h.log.Error(fmt.Sprintf("failed to initialize key/value host service: %s", err.Error())) + return err + } else { + h.log.Debug("initialized key/value host service") + } + + h.messaging, err = hostservices.NewMessagingService(h.nc, h.log) + if err != nil { + h.log.Error(fmt.Sprintf("failed to initialize messaging host service: %s", err.Error())) + return err + } else { + h.log.Debug("initialized messaging host service") + } + + h.object, err = hostservices.NewObjectStoreService(h.nc, h.log) + if err != nil { + h.log.Error(fmt.Sprintf("failed to initialize object store host service: %s", err.Error())) + return err + } else { + h.log.Debug("initialized object store host service") + } + + // agentint.{vmID}.rpc.{namespace}.{service}.{method} + _, err = h.ncint.Subscribe("agentint.*.rpc.*.*.*", h.handleRPC) + if err != nil { + return err + } + + return nil +} + +func (h *HostServices) handleRPC(msg *nats.Msg) { + // agentint.{vmID}.rpc.{namespace}.{service}.{method} + tokens := strings.Split(msg.Subject, ".") + vmID := tokens[1] + namespace := tokens[3] + service := tokens[4] + method := tokens[5] + + _, ok := h.mgr.allVMs[vmID] + if !ok { + h.log.Warn("Received a host services RPC request from an unknown VM.") + resp, _ := json.Marshal(map[string]interface{}{ + "error": "unknown vm", + }) + + err := msg.Respond(resp) + if err != nil { + h.log.Error(fmt.Sprintf("failed to respond to host services RPC request: %s", err.Error())) + } + return + } + + h.log.Debug("Received host services RPC request", + slog.String("vmid", vmID), + slog.String("namespace", namespace), + slog.String("service", service), + slog.String("method", method), + slog.Int("payload_size", len(msg.Data)), + ) + + switch service { + case hostServiceHTTP: + h.http.HandleRPC(msg) + case hostServiceKeyValue: + h.kv.HandleRPC(msg) + case hostServiceMessaging: + h.messaging.HandleRPC(msg) + case hostServiceObjectStore: + h.object.HandleRPC(msg) + default: + h.log.Warn("Received invalid host services RPC request", + slog.String("service", service), + slog.String("method", method), + ) + + resp, _ := json.Marshal(map[string]interface{}{ + "error": "invalid rpc request", + }) + + err := msg.Respond(resp) + if err != nil { + h.log.Error(fmt.Sprintf("failed to respond to host services RPC request: %s", err.Error())) + } + } +} diff --git a/internal/node/services/api.go b/internal/node/services/api.go new file mode 100644 index 00000000..2113927a --- /dev/null +++ b/internal/node/services/api.go @@ -0,0 +1,7 @@ +package services + +import "github.com/nats-io/nats.go" + +type HostService interface { + HandleRPC(msg *nats.Msg) +} diff --git a/internal/node/services/lib/http.go b/internal/node/services/lib/http.go new file mode 100644 index 00000000..c724a89b --- /dev/null +++ b/internal/node/services/lib/http.go @@ -0,0 +1,51 @@ +package lib + +import ( + "log/slog" + "strings" + + "github.com/nats-io/nats.go" +) + +// HTTP client operations available: +// Request (payload contains method, headers, etc) + +type HTTPService struct { + log *slog.Logger + nc *nats.Conn +} + +func NewHTTPService(nc *nats.Conn, log *slog.Logger) (*HTTPService, error) { + http := &HTTPService{ + log: log, + nc: nc, + } + + err := http.init() + if err != nil { + return nil, err + } + + return http, nil +} + +func (h *HTTPService) init() error { + return nil +} + +func (h *HTTPService) HandleRPC(msg *nats.Msg) { + // agentint.{vmID}.rpc.{namespace}.{service}.{method} + tokens := strings.Split(msg.Subject, ".") + service := tokens[4] + method := tokens[5] + + switch method { + default: + h.log.Warn("Received invalid host services RPC request", + slog.String("service", service), + slog.String("method", method), + ) + + // msg.Respond() + } +} diff --git a/internal/node/services/lib/keyvalue.go b/internal/node/services/lib/keyvalue.go new file mode 100644 index 00000000..1cf24938 --- /dev/null +++ b/internal/node/services/lib/keyvalue.go @@ -0,0 +1,324 @@ +package lib + +import ( + "encoding/json" + "errors" + "fmt" + "log/slog" + "strings" + + "github.com/nats-io/nats.go" + agentapi "github.com/synadia-io/nex/internal/agent-api" +) + +const kvServiceMethodGet = "get" +const kvServiceMethodSet = "set" +const kvServiceMethodDelete = "delete" +const kvServiceMethodKeys = "keys" + +type KeyValueService struct { + log *slog.Logger + nc *nats.Conn +} + +func NewKeyValueService(nc *nats.Conn, log *slog.Logger) (*KeyValueService, error) { + kv := &KeyValueService{ + log: log, + nc: nc, + } + + err := kv.init() + if err != nil { + return nil, err + } + + return kv, nil +} + +func (k *KeyValueService) init() error { + return nil +} + +func (k *KeyValueService) HandleRPC(msg *nats.Msg) { + // agentint.{vmID}.rpc.{namespace}.{service}.{method} + tokens := strings.Split(msg.Subject, ".") + service := tokens[4] + method := tokens[5] + + switch method { + case kvServiceMethodGet: + k.handleKeyValueGet(msg) + case kvServiceMethodSet: + k.handleKeyValueSet(msg) + case kvServiceMethodDelete: + k.handleKeyValueDelete(msg) + case kvServiceMethodKeys: + k.handleKeyValueKeys(msg) + default: + k.log.Warn("Received invalid host services RPC request", + slog.String("service", service), + slog.String("method", method), + ) + + // msg.Respond() + } +} + +func (k *KeyValueService) handleKeyValueGet(msg *nats.Msg) { + tokens := strings.Split(msg.Subject, ".") + vmID := tokens[1] + namespace := tokens[3] + + kvStore, err := k.resolveKeyValueStore(vmID, namespace) // FIXME-- should this be worload name + namespace + if err != nil { + k.log.Warn(fmt.Sprintf("failed to resolve key/value store: %s", err.Error())) + } + + var req *agentapi.HostServicesKeyValueRequest + err = json.Unmarshal(msg.Data, &req) + if err != nil { + k.log.Warn(fmt.Sprintf("failed to unmarshal key/value RPC request: %s", err.Error())) + + resp, _ := json.Marshal(map[string]interface{}{ + "error": fmt.Sprintf("failed to unmarshal key/value RPC request: %s", err.Error()), + }) + + err := msg.Respond(resp) + if err != nil { + k.log.Error(fmt.Sprintf("failed to respond to host services RPC request: %s", err.Error())) + } + return + } + + if req.Key == nil { + resp, _ := json.Marshal(map[string]interface{}{ + "error": "key is required", + }) + + err := msg.Respond(resp) + if err != nil { + k.log.Error(fmt.Sprintf("failed to respond to host services RPC request: %s", err.Error())) + } + return + } + + entry, err := kvStore.Get(*req.Key) + if err != nil { + k.log.Warn(fmt.Sprintf("failed to get value for key %s: %s", *req.Key, err.Error())) + + resp, _ := json.Marshal(map[string]interface{}{ + "error": fmt.Sprintf("failed to delete keys %s: %s", *req.Key, err.Error()), + }) + + err := msg.Respond(resp) + if err != nil { + k.log.Error(fmt.Sprintf("failed to respond to host services RPC request: %s", err.Error())) + } + return + } + + val := json.RawMessage(entry.Value()) + resp, _ := json.Marshal(&agentapi.HostServicesKeyValueRequest{ + Key: req.Key, + Value: &val, + }) + err = msg.Respond(resp) + if err != nil { + k.log.Warn(fmt.Sprintf("failed to respond to key/value host service request: %s", err.Error())) + } +} + +func (k *KeyValueService) handleKeyValueSet(msg *nats.Msg) { + tokens := strings.Split(msg.Subject, ".") + vmID := tokens[1] + namespace := tokens[3] + + kvStore, err := k.resolveKeyValueStore(vmID, namespace) // FIXME-- should this be worload name + namespace + if err != nil { + k.log.Warn(fmt.Sprintf("failed to resolve key/value store: %s", err.Error())) + } + + var req *agentapi.HostServicesKeyValueRequest + err = json.Unmarshal(msg.Data, &req) + if err != nil { + k.log.Warn(fmt.Sprintf("failed to unmarshal key/value RPC request: %s", err.Error())) + + resp, _ := json.Marshal(map[string]interface{}{ + "error": fmt.Sprintf("failed to unmarshal key/value RPC request: %s", err.Error()), + }) + + err := msg.Respond(resp) + if err != nil { + k.log.Error(fmt.Sprintf("failed to respond to host services RPC request: %s", err.Error())) + } + return + } + + // FIXME-- add req.Validate() + if req.Key == nil { + resp, _ := json.Marshal(map[string]interface{}{ + "error": "key is required", + }) + + err := msg.Respond(resp) + if err != nil { + k.log.Error(fmt.Sprintf("failed to respond to host services RPC request: %s", err.Error())) + } + return + } + + // FIXME-- add req.Validate() + if req.Value == nil { + resp, _ := json.Marshal(map[string]interface{}{ + "error": "value is required", + }) + + err := msg.Respond(resp) + if err != nil { + k.log.Error(fmt.Sprintf("failed to respond to host services RPC request: %s", err.Error())) + } + return + } + + revision, err := kvStore.Put(*req.Key, *req.Value) + if err != nil { + k.log.Warn(fmt.Sprintf("failed to write %d-byte value for key %s: %s", len(*req.Value), *req.Key, err.Error())) + + resp, _ := json.Marshal(map[string]interface{}{ + "error": fmt.Sprintf("failed to write %d-byte value for key %s: %s", len(*req.Value), *req.Key, err.Error()), + }) + + err := msg.Respond(resp) + if err != nil { + k.log.Error(fmt.Sprintf("failed to respond to host services RPC request: %s", err.Error())) + } + return + } + + resp, _ := json.Marshal(map[string]interface{}{ + "revision": revision, + "success": true, + }) + err = msg.Respond(resp) + if err != nil { + k.log.Warn(fmt.Sprintf("failed to respond to key/value host service request: %s", err.Error())) + } +} + +func (k *KeyValueService) handleKeyValueDelete(msg *nats.Msg) { + tokens := strings.Split(msg.Subject, ".") + vmID := tokens[1] + namespace := tokens[3] + + kvStore, err := k.resolveKeyValueStore(vmID, namespace) // FIXME-- should this be worload name + namespace + if err != nil { + k.log.Warn(fmt.Sprintf("failed to resolve key/value store: %s", err.Error())) + } + + var req *agentapi.HostServicesKeyValueRequest + err = json.Unmarshal(msg.Data, &req) + if err != nil { + k.log.Warn(fmt.Sprintf("failed to unmarshal key/value RPC request: %s", err.Error())) + + resp, _ := json.Marshal(map[string]interface{}{ + "error": fmt.Sprintf("failed to unmarshal key/value RPC request: %s", err.Error()), + }) + + err := msg.Respond(resp) + if err != nil { + k.log.Error(fmt.Sprintf("failed to respond to host services RPC request: %s", err.Error())) + } + return + } + + if req.Key == nil { + resp, _ := json.Marshal(map[string]interface{}{ + "error": "key is required", + }) + + err := msg.Respond(resp) + if err != nil { + k.log.Error(fmt.Sprintf("failed to respond to host services RPC request: %s", err.Error())) + } + return + } + + err = kvStore.Delete(*req.Key) + if err != nil { + k.log.Warn(fmt.Sprintf("failed to delete key %s: %s", *req.Key, err.Error())) + + resp, _ := json.Marshal(map[string]interface{}{ + "error": fmt.Sprintf("failed to delete keys %s: %s", *req.Key, err.Error()), + }) + + err := msg.Respond(resp) + if err != nil { + k.log.Error(fmt.Sprintf("failed to respond to host services RPC request: %s", err.Error())) + } + return + } + + resp, _ := json.Marshal(map[string]interface{}{ + "success": true, + }) + err = msg.Respond(resp) + if err != nil { + k.log.Warn(fmt.Sprintf("failed to respond to key/value host service request: %s", err.Error())) + } +} + +func (k *KeyValueService) handleKeyValueKeys(msg *nats.Msg) { + tokens := strings.Split(msg.Subject, ".") + vmID := tokens[1] + namespace := tokens[3] + + kvStore, err := k.resolveKeyValueStore(vmID, namespace) // FIXME-- should this be worload name + namespace + if err != nil { + k.log.Warn(fmt.Sprintf("failed to resolve key/value store: %s", err.Error())) + } + + keys, err := kvStore.Keys() // TODO-- paginate... + if err != nil { + k.log.Warn(fmt.Sprintf("failed to respond to key/value host service request: %s", err.Error())) + + resp, _ := json.Marshal(map[string]interface{}{ + "error": fmt.Sprintf("failed to resolve keys: %s", err.Error()), + }) + + err := msg.Respond(resp) + if err != nil { + k.log.Error(fmt.Sprintf("failed to respond to host services RPC request: %s", err.Error())) + } + + return + } + + resp, _ := json.Marshal(keys) + err = msg.Respond(resp) + if err != nil { + k.log.Warn(fmt.Sprintf("failed to respond to key/value host service request: %s", err.Error())) + } +} + +// resolve the key value store for this workload; initialize it if necessary +func (k *KeyValueService) resolveKeyValueStore(namespace, workload string) (nats.KeyValue, error) { + js, err := k.nc.JetStream() + if err != nil { + return nil, err + } + + kvStoreName := fmt.Sprintf("hs_%s_%s_kv", namespace, workload) + kvStore, err := js.KeyValue(kvStoreName) + if err != nil { + if errors.Is(err, nats.ErrBucketNotFound) { + kvStore, err = js.CreateKeyValue(&nats.KeyValueConfig{Bucket: kvStoreName}) + if err != nil { + return nil, err + } + } else { + return nil, err + } + } + + return kvStore, nil +} diff --git a/internal/node/services/lib/messaging.go b/internal/node/services/lib/messaging.go new file mode 100644 index 00000000..2f39eaf4 --- /dev/null +++ b/internal/node/services/lib/messaging.go @@ -0,0 +1,53 @@ +package lib + +import ( + "log/slog" + "strings" + + "github.com/nats-io/nats.go" +) + +// Messaging operations available: +// Publish +// Request +// RequestMany + +type MessagingService struct { + log *slog.Logger + nc *nats.Conn +} + +func NewMessagingService(nc *nats.Conn, log *slog.Logger) (*MessagingService, error) { + messaging := &MessagingService{ + log: log, + nc: nc, + } + + err := messaging.init() + if err != nil { + return nil, err + } + + return messaging, nil +} + +func (m *MessagingService) init() error { + return nil +} + +func (m *MessagingService) HandleRPC(msg *nats.Msg) { + // agentint.{vmID}.rpc.{namespace}.{service}.{method} + tokens := strings.Split(msg.Subject, ".") + service := tokens[4] + method := tokens[5] + + switch method { + default: + m.log.Warn("Received invalid host services RPC request", + slog.String("service", service), + slog.String("method", method), + ) + + // msg.Respond() + } +} diff --git a/internal/node/services/lib/objectstore.go b/internal/node/services/lib/objectstore.go new file mode 100644 index 00000000..066f95c9 --- /dev/null +++ b/internal/node/services/lib/objectstore.go @@ -0,0 +1,54 @@ +package lib + +import ( + "log/slog" + "strings" + + "github.com/nats-io/nats.go" +) + +// Object store operations available: +// PutChunk +// GetChunk +// Delete +// Keys (this can fail beyond some upper limit and/or require some form of paging) + +type ObjectStoreService struct { + log *slog.Logger + nc *nats.Conn +} + +func NewObjectStoreService(nc *nats.Conn, log *slog.Logger) (*ObjectStoreService, error) { + objectStore := &ObjectStoreService{ + log: log, + nc: nc, + } + + err := objectStore.init() + if err != nil { + return nil, err + } + + return objectStore, nil +} + +func (o *ObjectStoreService) init() error { + return nil +} + +func (o *ObjectStoreService) HandleRPC(msg *nats.Msg) { + // agentint.{vmID}.rpc.{namespace}.{service}.{method} + tokens := strings.Split(msg.Subject, ".") + service := tokens[4] + method := tokens[5] + + switch method { + default: + o.log.Warn("Received invalid host services RPC request", + slog.String("service", service), + slog.String("method", method), + ) + + // msg.Respond() + } +}