Skip to content

Commit

Permalink
Add initial node host services impl for key/value
Browse files Browse the repository at this point in the history
  • Loading branch information
kthomas committed Feb 15, 2024
1 parent 17f4b45 commit 7eaaab2
Show file tree
Hide file tree
Showing 8 changed files with 645 additions and 0 deletions.
6 changes: 6 additions & 0 deletions internal/agent-api/types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package agentapi

import (
"encoding/json"
"errors"
"io"
"strings"
Expand Down Expand Up @@ -119,6 +120,11 @@ type HandshakeRequest struct {
Message *string `json:"message,omitempty"`
}

type HostServicesKeyValueRequest struct {
Key *string `json:"key"`
Value *json.RawMessage `json:"value,omitempty"`
}

type MachineMetadata struct {
VmID *string `json:"vmid"`
NodeNatsHost *string `json:"node_nats_host"`
Expand Down
9 changes: 9 additions & 0 deletions internal/node/machine_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ type MachineManager struct {
handshakes map[string]string
handshakeTimeout time.Duration // TODO: make configurable...

hostServices *HostServices

natsStoreDir string
publicKey string
}
Expand Down Expand Up @@ -104,6 +106,13 @@ func NewMachineManager(
return nil, err
}

m.hostServices = NewHostServices(m, m.nc, m.ncInternal, m.log)
err = m.hostServices.init()
if err != nil {
m.log.Warn("Failed to initialize host services.", slog.Any("err", err))
return nil, err
}

return m, nil
}

Expand Down
141 changes: 141 additions & 0 deletions internal/node/services.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package nexnode

import (
"encoding/json"
"fmt"
"log/slog"
"strings"

"github.com/nats-io/nats.go"
"github.com/synadia-io/nex/internal/node/services"
hostservices "github.com/synadia-io/nex/internal/node/services/lib"
)

const hostServiceHTTP = "http"
const hostServiceKeyValue = "kv"
const hostServiceMessaging = "messaging"
const hostServiceObjectStore = "objectstore"

// Host services server implements select functionality which is
// exposed to workloads by way of the agent which makes RPC calls
// via the internal NATS connection
type HostServices struct {
log *slog.Logger
mgr *MachineManager
nc *nats.Conn
ncint *nats.Conn

http services.HostService
kv services.HostService
messaging services.HostService
object services.HostService
}

func NewHostServices(mgr *MachineManager, nc, ncint *nats.Conn, log *slog.Logger) *HostServices {
return &HostServices{
log: log,
mgr: mgr,
nc: nc,
ncint: ncint,
}
}

func (h *HostServices) init() error {
var err error

h.http, err = hostservices.NewHTTPService(h.nc, h.log)
if err != nil {
h.log.Error(fmt.Sprintf("failed to initialize http host service: %s", err.Error()))
return err
} else {
h.log.Debug("initialized http host service")
}

h.kv, err = hostservices.NewKeyValueService(h.nc, h.log)
if err != nil {
h.log.Error(fmt.Sprintf("failed to initialize key/value host service: %s", err.Error()))
return err
} else {
h.log.Debug("initialized key/value host service")
}

h.messaging, err = hostservices.NewMessagingService(h.nc, h.log)
if err != nil {
h.log.Error(fmt.Sprintf("failed to initialize messaging host service: %s", err.Error()))
return err
} else {
h.log.Debug("initialized messaging host service")
}

h.object, err = hostservices.NewObjectStoreService(h.nc, h.log)
if err != nil {
h.log.Error(fmt.Sprintf("failed to initialize object store host service: %s", err.Error()))
return err
} else {
h.log.Debug("initialized object store host service")
}

// agentint.{vmID}.rpc.{namespace}.{service}.{method}
_, err = h.ncint.Subscribe("agentint.*.rpc.*.*.*", h.handleRPC)
if err != nil {
return err
}

return nil
}

func (h *HostServices) handleRPC(msg *nats.Msg) {
// agentint.{vmID}.rpc.{namespace}.{service}.{method}
tokens := strings.Split(msg.Subject, ".")
vmID := tokens[1]
namespace := tokens[3]
service := tokens[4]
method := tokens[5]

_, ok := h.mgr.allVMs[vmID]
if !ok {
h.log.Warn("Received a host services RPC request from an unknown VM.")
resp, _ := json.Marshal(map[string]interface{}{
"error": "unknown vm",
})

err := msg.Respond(resp)
if err != nil {
h.log.Error(fmt.Sprintf("failed to respond to host services RPC request: %s", err.Error()))
}
return
}

h.log.Debug("Received host services RPC request",
slog.String("vmid", vmID),
slog.String("namespace", namespace),
slog.String("service", service),
slog.String("method", method),
slog.Int("payload_size", len(msg.Data)),
)

switch service {
case hostServiceHTTP:
h.http.HandleRPC(msg)
case hostServiceKeyValue:
h.kv.HandleRPC(msg)
case hostServiceMessaging:
h.messaging.HandleRPC(msg)
case hostServiceObjectStore:
h.object.HandleRPC(msg)
default:
h.log.Warn("Received invalid host services RPC request",
slog.String("service", service),
slog.String("method", method),
)

resp, _ := json.Marshal(map[string]interface{}{
"error": "invalid rpc request",
})

err := msg.Respond(resp)
if err != nil {
h.log.Error(fmt.Sprintf("failed to respond to host services RPC request: %s", err.Error()))
}
}
}
7 changes: 7 additions & 0 deletions internal/node/services/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package services

import "github.com/nats-io/nats.go"

type HostService interface {
HandleRPC(msg *nats.Msg)
}
51 changes: 51 additions & 0 deletions internal/node/services/lib/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package lib

import (
"log/slog"
"strings"

"github.com/nats-io/nats.go"
)

// HTTP client operations available:
// Request (payload contains method, headers, etc)

type HTTPService struct {
log *slog.Logger
nc *nats.Conn
}

func NewHTTPService(nc *nats.Conn, log *slog.Logger) (*HTTPService, error) {
http := &HTTPService{
log: log,
nc: nc,
}

err := http.init()
if err != nil {
return nil, err
}

return http, nil
}

func (h *HTTPService) init() error {
return nil
}

func (h *HTTPService) HandleRPC(msg *nats.Msg) {
// agentint.{vmID}.rpc.{namespace}.{service}.{method}
tokens := strings.Split(msg.Subject, ".")
service := tokens[4]
method := tokens[5]

switch method {
default:
h.log.Warn("Received invalid host services RPC request",
slog.String("service", service),
slog.String("method", method),
)

// msg.Respond()
}
}
Loading

0 comments on commit 7eaaab2

Please sign in to comment.