From 3d22236c9ddc0218e5d1a50c7ec09af092b9cbb8 Mon Sep 17 00:00:00 2001 From: Kevin Hoffman Date: Tue, 20 Aug 2024 14:58:30 -0400 Subject: [PATCH 1/2] add in-memory host services for testing --- host-services/builtins/objectstore.go | 2 + host-services/inmem/keyvalue.go | 118 +++++++++++++++++++++ host-services/inmem/objectstore.go | 143 ++++++++++++++++++++++++++ test/inmem_hostservices/main.go | 46 +++++++++ 4 files changed, 309 insertions(+) create mode 100644 host-services/inmem/keyvalue.go create mode 100644 host-services/inmem/objectstore.go create mode 100644 test/inmem_hostservices/main.go diff --git a/host-services/builtins/objectstore.go b/host-services/builtins/objectstore.go index c504dc5b..f57f299d 100644 --- a/host-services/builtins/objectstore.go +++ b/host-services/builtins/objectstore.go @@ -109,6 +109,8 @@ func (o *ObjectStoreService) HandleRequest( } } +// TODO: stop bleeding jetstream ObjectInfo type in the JSON payload +// and use a more abstract, host-services type func (o *ObjectStoreService) handleGet( ctx context.Context, objectStore jetstream.ObjectStore, diff --git a/host-services/inmem/keyvalue.go b/host-services/inmem/keyvalue.go new file mode 100644 index 00000000..1514fbe2 --- /dev/null +++ b/host-services/inmem/keyvalue.go @@ -0,0 +1,118 @@ +package inmem + +import ( + "encoding/json" + "log/slog" + + "github.com/nats-io/nats.go" + hostservices "github.com/synadia-io/nex/host-services" + agentapi "github.com/synadia-io/nex/internal/agent-api" +) + +const kvServiceMethodGet = "get" +const kvServiceMethodSet = "set" +const kvServiceMethodDelete = "delete" +const kvServiceMethodKeys = "keys" + +type InmemKeyValue struct { + log *slog.Logger + kvstore map[string][]byte +} + +func NewInmemKeyValueService(log *slog.Logger) *InmemKeyValue { + return &InmemKeyValue{ + log: log, + kvstore: make(map[string][]byte), + } +} + +func (k *InmemKeyValue) Initialize(_ json.RawMessage) error { + return nil +} + +func (k *InmemKeyValue) HandleRequest( + conns map[string]*nats.Conn, + namespace string, + workloadId string, + method string, + workloadName string, + metadata map[string]string, + request []byte, +) (hostservices.ServiceResult, error) { + + switch method { + case kvServiceMethodGet: + return k.handleGet(metadata) + case kvServiceMethodSet: + return k.handleSet(request, metadata) + case kvServiceMethodDelete: + return k.handleDelete(metadata) + case kvServiceMethodKeys: + return k.handleKeys() + default: + k.log.Warn("Received invalid host services RPC request", + slog.String("service", "kv"), + slog.String("method", method), + ) + return hostservices.ServiceResultFail(400, "Received invalid host services RPC request"), nil + } +} + +func (k *InmemKeyValue) handleGet( + metadata map[string]string, +) (hostservices.ServiceResult, error) { + + key := metadata[agentapi.KeyValueKeyHeader] + if key == "" { + return hostservices.ServiceResultFail(400, "key is required"), nil + } + if val, ok := k.kvstore[key]; ok { + return hostservices.ServiceResultPass(200, "", val), nil + } + return hostservices.ServiceResultFail(404, "no such key"), nil +} + +func (k *InmemKeyValue) handleSet( + data []byte, + metadata map[string]string, +) (hostservices.ServiceResult, error) { + + key := metadata[agentapi.KeyValueKeyHeader] + if key == "" { + return hostservices.ServiceResultFail(400, "key is required"), nil + } + k.kvstore[key] = data + resp, _ := json.Marshal(map[string]interface{}{ + "revision": 1, + "success": true, + }) + return hostservices.ServiceResultPass(200, "", resp), nil +} + +func (k *InmemKeyValue) handleDelete( + metadata map[string]string, +) (hostservices.ServiceResult, error) { + + key := metadata[agentapi.KeyValueKeyHeader] + if key == "" { + return hostservices.ServiceResultFail(400, "key is required"), nil + } + + delete(k.kvstore, key) + + resp, _ := json.Marshal(map[string]interface{}{ + "success": true, + }) + return hostservices.ServiceResultPass(200, "", resp), nil +} + +func (k *InmemKeyValue) handleKeys() (hostservices.ServiceResult, error) { + keys := make([]string, 0) + for key := range k.kvstore { + keys = append(keys, key) + } + + resp, _ := json.Marshal(keys) + + return hostservices.ServiceResultPass(200, "", resp), nil +} diff --git a/host-services/inmem/objectstore.go b/host-services/inmem/objectstore.go new file mode 100644 index 00000000..a67df554 --- /dev/null +++ b/host-services/inmem/objectstore.go @@ -0,0 +1,143 @@ +package inmem + +import ( + "crypto/sha256" + "encoding/base64" + "encoding/json" + "fmt" + "log/slog" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + hostservices "github.com/synadia-io/nex/host-services" + agentapi "github.com/synadia-io/nex/internal/agent-api" +) + +const ( + objectStoreServiceMethodGet = "get" + objectStoreServiceMethodPut = "put" + objectStoreServiceMethodDelete = "delete" + objectStoreServiceMethodList = "list" +) + +type InmemObjectStore struct { + log *slog.Logger + objects map[string][]byte +} + +func NewInmemObjectStore(log *slog.Logger) *InmemObjectStore { + return &InmemObjectStore{ + objects: make(map[string][]byte), + log: log, + } +} + +func (o *InmemObjectStore) Initialize(_ json.RawMessage) error { + return nil +} + +func (o *InmemObjectStore) HandleRequest( + conns map[string]*nats.Conn, + namespace string, + workloadId string, + method string, + workloadName string, + metadata map[string]string, + request []byte) (hostservices.ServiceResult, error) { + + switch method { + case objectStoreServiceMethodGet: + return o.handleGet(metadata) + case objectStoreServiceMethodPut: + return o.handlePut(request, metadata) + case objectStoreServiceMethodDelete: + return o.handleDelete(metadata) + case objectStoreServiceMethodList: + return o.handleList() + default: + o.log.Warn("Received invalid host services RPC request", + slog.String("service", "objectstore"), + slog.String("method", method), + ) + return hostservices.ServiceResultFail(400, "unknown method"), nil + } +} + +func (o *InmemObjectStore) handleGet(metadata map[string]string) (hostservices.ServiceResult, error) { + name := metadata[agentapi.ObjectStoreObjectNameHeader] + if name == "" { + return hostservices.ServiceResultFail(400, "name is required"), nil + } + + if object, ok := o.objects[name]; ok { + return hostservices.ServiceResultPass(200, "", object), nil + } + + return hostservices.ServiceResultFail(404, "no such object"), nil +} + +func (o *InmemObjectStore) handlePut(request []byte, metadata map[string]string) (hostservices.ServiceResult, error) { + name := metadata[agentapi.ObjectStoreObjectNameHeader] + if name == "" { + return hostservices.ServiceResultFail(400, "name is required"), nil + } + + o.objects[name] = request + + res := jetstream.ObjectInfo{ + ObjectMeta: jetstream.ObjectMeta{ + Name: name, + Description: "In-memory object", + }, + Bucket: "bucket", + Size: uint64(len(request)), + Digest: makeDigest(request), + } + output, _ := json.Marshal(res) + + return hostservices.ServiceResultPass(200, "", output), nil +} + +func (o *InmemObjectStore) handleDelete( + metadata map[string]string, +) (hostservices.ServiceResult, error) { + + name := metadata[agentapi.ObjectStoreObjectNameHeader] + if name == "" { + return hostservices.ServiceResultFail(400, "name is required"), nil + } + + delete(o.objects, name) + + resp, _ := json.Marshal(&agentapi.HostServicesObjectStoreResponse{ + Success: true, + }) + return hostservices.ServiceResultPass(200, "", resp), nil +} + +func (o *InmemObjectStore) handleList() (hostservices.ServiceResult, error) { + output := make([]jetstream.ObjectInfo, 0) + + for k, v := range o.objects { + output = append(output, jetstream.ObjectInfo{ + ObjectMeta: jetstream.ObjectMeta{ + Name: k, + Description: "In-memory object", + }, + Bucket: "bucket", + Size: uint64(len(v)), + Digest: makeDigest(v), + }) + } + resp, _ := json.Marshal(output) + return hostservices.ServiceResultPass(200, "", resp), nil +} + +// NOTE : digest comes back from NATS as a sha-256 hash base64-encoded in the form SHA-256={hash} +func makeDigest(input []byte) string { + s := sha256.New() + s.Write(input) + hs := s.Sum(nil) + + return fmt.Sprintf("SHA-256=%s", base64.URLEncoding.EncodeToString(hs)) +} diff --git a/test/inmem_hostservices/main.go b/test/inmem_hostservices/main.go new file mode 100644 index 00000000..cda31573 --- /dev/null +++ b/test/inmem_hostservices/main.go @@ -0,0 +1,46 @@ +package main + +import ( + "context" + "log/slog" + + "disorder.dev/shandler" + "github.com/nats-io/nats.go" + hostservices "github.com/synadia-io/nex/host-services" + "github.com/synadia-io/nex/host-services/builtins" + "github.com/synadia-io/nex/host-services/inmem" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace/noop" +) + +func main() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + nc, err := nats.Connect("0.0.0.0:4222") + if err != nil { + panic(err) + } + + noopProvider := noop.NewTracerProvider() + otel.SetTracerProvider(noopProvider) + t := otel.Tracer("") + + var handlerOpts []shandler.HandlerOption + handlerOpts = append(handlerOpts, shandler.WithLogLevel(slog.LevelDebug)) + log := slog.New(shandler.NewHandler(handlerOpts...)) + + hsServer := hostservices.NewHostServicesServer(nc, log, t) + http, _ := builtins.NewHTTPService(log) + messaging, _ := builtins.NewMessagingService(log) + kv := inmem.NewInmemKeyValueService(log) + obj := inmem.NewInmemObjectStore(log) + + hsServer.AddService("http", http, []byte{}) + hsServer.AddService("messaging", messaging, []byte{}) + hsServer.AddService("kv", kv, []byte{}) + hsServer.AddService("objectstore", obj, []byte{}) + + _ = hsServer.Start() + + <-ctx.Done() +} From 88cba91617622ac75d8ef7ada2ad40b7cf4761b4 Mon Sep 17 00:00:00 2001 From: Kevin Hoffman Date: Tue, 20 Aug 2024 15:03:30 -0400 Subject: [PATCH 2/2] lint --- test/inmem_hostservices/main.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/inmem_hostservices/main.go b/test/inmem_hostservices/main.go index cda31573..c4631255 100644 --- a/test/inmem_hostservices/main.go +++ b/test/inmem_hostservices/main.go @@ -35,10 +35,10 @@ func main() { kv := inmem.NewInmemKeyValueService(log) obj := inmem.NewInmemObjectStore(log) - hsServer.AddService("http", http, []byte{}) - hsServer.AddService("messaging", messaging, []byte{}) - hsServer.AddService("kv", kv, []byte{}) - hsServer.AddService("objectstore", obj, []byte{}) + _ = hsServer.AddService("http", http, []byte{}) + _ = hsServer.AddService("messaging", messaging, []byte{}) + _ = hsServer.AddService("kv", kv, []byte{}) + _ = hsServer.AddService("objectstore", obj, []byte{}) _ = hsServer.Start()