Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add in-memory host services for testing #365

Merged
merged 2 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions host-services/builtins/objectstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ func (o *ObjectStoreService) HandleRequest(
}
}

// TODO: stop bleeding jetstream ObjectInfo type in the JSON payload
// and use a more abstract, host-services type
func (o *ObjectStoreService) handleGet(
ctx context.Context,
objectStore jetstream.ObjectStore,
Expand Down
118 changes: 118 additions & 0 deletions host-services/inmem/keyvalue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package inmem

import (
"encoding/json"
"log/slog"

"github.com/nats-io/nats.go"
hostservices "github.com/synadia-io/nex/host-services"
agentapi "github.com/synadia-io/nex/internal/agent-api"
)

const kvServiceMethodGet = "get"
const kvServiceMethodSet = "set"
const kvServiceMethodDelete = "delete"
const kvServiceMethodKeys = "keys"

type InmemKeyValue struct {
log *slog.Logger
kvstore map[string][]byte
}

func NewInmemKeyValueService(log *slog.Logger) *InmemKeyValue {
return &InmemKeyValue{
log: log,
kvstore: make(map[string][]byte),
}
}

func (k *InmemKeyValue) Initialize(_ json.RawMessage) error {
return nil
}

func (k *InmemKeyValue) HandleRequest(
conns map[string]*nats.Conn,
namespace string,
workloadId string,
method string,
workloadName string,
metadata map[string]string,
request []byte,
) (hostservices.ServiceResult, error) {

switch method {
case kvServiceMethodGet:
return k.handleGet(metadata)
case kvServiceMethodSet:
return k.handleSet(request, metadata)
case kvServiceMethodDelete:
return k.handleDelete(metadata)
case kvServiceMethodKeys:
return k.handleKeys()
default:
k.log.Warn("Received invalid host services RPC request",
slog.String("service", "kv"),
slog.String("method", method),
)
return hostservices.ServiceResultFail(400, "Received invalid host services RPC request"), nil
}
}

func (k *InmemKeyValue) handleGet(
metadata map[string]string,
) (hostservices.ServiceResult, error) {

key := metadata[agentapi.KeyValueKeyHeader]
if key == "" {
return hostservices.ServiceResultFail(400, "key is required"), nil
}
if val, ok := k.kvstore[key]; ok {
return hostservices.ServiceResultPass(200, "", val), nil
}
return hostservices.ServiceResultFail(404, "no such key"), nil
}

func (k *InmemKeyValue) handleSet(
data []byte,
metadata map[string]string,
) (hostservices.ServiceResult, error) {

key := metadata[agentapi.KeyValueKeyHeader]
if key == "" {
return hostservices.ServiceResultFail(400, "key is required"), nil
}
k.kvstore[key] = data
resp, _ := json.Marshal(map[string]interface{}{
"revision": 1,
"success": true,
})
return hostservices.ServiceResultPass(200, "", resp), nil
}

func (k *InmemKeyValue) handleDelete(
metadata map[string]string,
) (hostservices.ServiceResult, error) {

key := metadata[agentapi.KeyValueKeyHeader]
if key == "" {
return hostservices.ServiceResultFail(400, "key is required"), nil
}

delete(k.kvstore, key)

resp, _ := json.Marshal(map[string]interface{}{
"success": true,
})
return hostservices.ServiceResultPass(200, "", resp), nil
}

func (k *InmemKeyValue) handleKeys() (hostservices.ServiceResult, error) {
keys := make([]string, 0)
for key := range k.kvstore {
keys = append(keys, key)
}

resp, _ := json.Marshal(keys)

return hostservices.ServiceResultPass(200, "", resp), nil
}
143 changes: 143 additions & 0 deletions host-services/inmem/objectstore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package inmem

import (
"crypto/sha256"
"encoding/base64"
"encoding/json"
"fmt"
"log/slog"

"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
hostservices "github.com/synadia-io/nex/host-services"
agentapi "github.com/synadia-io/nex/internal/agent-api"
)

const (
objectStoreServiceMethodGet = "get"
objectStoreServiceMethodPut = "put"
objectStoreServiceMethodDelete = "delete"
objectStoreServiceMethodList = "list"
)

type InmemObjectStore struct {
log *slog.Logger
objects map[string][]byte
}

func NewInmemObjectStore(log *slog.Logger) *InmemObjectStore {
return &InmemObjectStore{
objects: make(map[string][]byte),
log: log,
}
}

func (o *InmemObjectStore) Initialize(_ json.RawMessage) error {
return nil
}

func (o *InmemObjectStore) HandleRequest(
conns map[string]*nats.Conn,
namespace string,
workloadId string,
method string,
workloadName string,
metadata map[string]string,
request []byte) (hostservices.ServiceResult, error) {

switch method {
case objectStoreServiceMethodGet:
return o.handleGet(metadata)
case objectStoreServiceMethodPut:
return o.handlePut(request, metadata)
case objectStoreServiceMethodDelete:
return o.handleDelete(metadata)
case objectStoreServiceMethodList:
return o.handleList()
default:
o.log.Warn("Received invalid host services RPC request",
slog.String("service", "objectstore"),
slog.String("method", method),
)
return hostservices.ServiceResultFail(400, "unknown method"), nil
}
}

func (o *InmemObjectStore) handleGet(metadata map[string]string) (hostservices.ServiceResult, error) {
name := metadata[agentapi.ObjectStoreObjectNameHeader]
if name == "" {
return hostservices.ServiceResultFail(400, "name is required"), nil
}

if object, ok := o.objects[name]; ok {
return hostservices.ServiceResultPass(200, "", object), nil
}

return hostservices.ServiceResultFail(404, "no such object"), nil
}

func (o *InmemObjectStore) handlePut(request []byte, metadata map[string]string) (hostservices.ServiceResult, error) {
name := metadata[agentapi.ObjectStoreObjectNameHeader]
if name == "" {
return hostservices.ServiceResultFail(400, "name is required"), nil
}

o.objects[name] = request

res := jetstream.ObjectInfo{
ObjectMeta: jetstream.ObjectMeta{
Name: name,
Description: "In-memory object",
},
Bucket: "bucket",
Size: uint64(len(request)),
Digest: makeDigest(request),
}
output, _ := json.Marshal(res)

return hostservices.ServiceResultPass(200, "", output), nil
}

func (o *InmemObjectStore) handleDelete(
metadata map[string]string,
) (hostservices.ServiceResult, error) {

name := metadata[agentapi.ObjectStoreObjectNameHeader]
if name == "" {
return hostservices.ServiceResultFail(400, "name is required"), nil
}

delete(o.objects, name)

resp, _ := json.Marshal(&agentapi.HostServicesObjectStoreResponse{
Success: true,
})
return hostservices.ServiceResultPass(200, "", resp), nil
}

func (o *InmemObjectStore) handleList() (hostservices.ServiceResult, error) {
output := make([]jetstream.ObjectInfo, 0)

for k, v := range o.objects {
output = append(output, jetstream.ObjectInfo{
ObjectMeta: jetstream.ObjectMeta{
Name: k,
Description: "In-memory object",
},
Bucket: "bucket",
Size: uint64(len(v)),
Digest: makeDigest(v),
})
}
resp, _ := json.Marshal(output)
return hostservices.ServiceResultPass(200, "", resp), nil
}

// NOTE : digest comes back from NATS as a sha-256 hash base64-encoded in the form SHA-256={hash}
func makeDigest(input []byte) string {
s := sha256.New()
s.Write(input)
hs := s.Sum(nil)

return fmt.Sprintf("SHA-256=%s", base64.URLEncoding.EncodeToString(hs))
}
46 changes: 46 additions & 0 deletions test/inmem_hostservices/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package main

import (
"context"
"log/slog"

"disorder.dev/shandler"
"github.com/nats-io/nats.go"
hostservices "github.com/synadia-io/nex/host-services"
"github.com/synadia-io/nex/host-services/builtins"
"github.com/synadia-io/nex/host-services/inmem"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace/noop"
)

func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nc, err := nats.Connect("0.0.0.0:4222")
if err != nil {
panic(err)
}

noopProvider := noop.NewTracerProvider()
otel.SetTracerProvider(noopProvider)
t := otel.Tracer("")

var handlerOpts []shandler.HandlerOption
handlerOpts = append(handlerOpts, shandler.WithLogLevel(slog.LevelDebug))
log := slog.New(shandler.NewHandler(handlerOpts...))

hsServer := hostservices.NewHostServicesServer(nc, log, t)
http, _ := builtins.NewHTTPService(log)
messaging, _ := builtins.NewMessagingService(log)
kv := inmem.NewInmemKeyValueService(log)
obj := inmem.NewInmemObjectStore(log)

_ = hsServer.AddService("http", http, []byte{})
_ = hsServer.AddService("messaging", messaging, []byte{})
_ = hsServer.AddService("kv", kv, []byte{})
_ = hsServer.AddService("objectstore", obj, []byte{})

_ = hsServer.Start()

<-ctx.Done()
}
Loading