Skip to content

Commit

Permalink
add in-memory host services for testing
Browse files Browse the repository at this point in the history
  • Loading branch information
autodidaddict committed Aug 20, 2024
1 parent 081c831 commit 3d22236
Show file tree
Hide file tree
Showing 4 changed files with 309 additions and 0 deletions.
2 changes: 2 additions & 0 deletions host-services/builtins/objectstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ func (o *ObjectStoreService) HandleRequest(
}
}

// TODO: stop bleeding jetstream ObjectInfo type in the JSON payload
// and use a more abstract, host-services type
func (o *ObjectStoreService) handleGet(
ctx context.Context,
objectStore jetstream.ObjectStore,
Expand Down
118 changes: 118 additions & 0 deletions host-services/inmem/keyvalue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package inmem

import (
"encoding/json"
"log/slog"

"github.com/nats-io/nats.go"
hostservices "github.com/synadia-io/nex/host-services"
agentapi "github.com/synadia-io/nex/internal/agent-api"
)

const kvServiceMethodGet = "get"
const kvServiceMethodSet = "set"
const kvServiceMethodDelete = "delete"
const kvServiceMethodKeys = "keys"

type InmemKeyValue struct {
log *slog.Logger
kvstore map[string][]byte
}

func NewInmemKeyValueService(log *slog.Logger) *InmemKeyValue {
return &InmemKeyValue{
log: log,
kvstore: make(map[string][]byte),
}
}

func (k *InmemKeyValue) Initialize(_ json.RawMessage) error {
return nil
}

func (k *InmemKeyValue) HandleRequest(
conns map[string]*nats.Conn,
namespace string,
workloadId string,
method string,
workloadName string,
metadata map[string]string,
request []byte,
) (hostservices.ServiceResult, error) {

switch method {
case kvServiceMethodGet:
return k.handleGet(metadata)
case kvServiceMethodSet:
return k.handleSet(request, metadata)
case kvServiceMethodDelete:
return k.handleDelete(metadata)
case kvServiceMethodKeys:
return k.handleKeys()
default:
k.log.Warn("Received invalid host services RPC request",
slog.String("service", "kv"),
slog.String("method", method),
)
return hostservices.ServiceResultFail(400, "Received invalid host services RPC request"), nil
}
}

func (k *InmemKeyValue) handleGet(
metadata map[string]string,
) (hostservices.ServiceResult, error) {

key := metadata[agentapi.KeyValueKeyHeader]
if key == "" {
return hostservices.ServiceResultFail(400, "key is required"), nil
}
if val, ok := k.kvstore[key]; ok {
return hostservices.ServiceResultPass(200, "", val), nil
}
return hostservices.ServiceResultFail(404, "no such key"), nil
}

func (k *InmemKeyValue) handleSet(
data []byte,
metadata map[string]string,
) (hostservices.ServiceResult, error) {

key := metadata[agentapi.KeyValueKeyHeader]
if key == "" {
return hostservices.ServiceResultFail(400, "key is required"), nil
}
k.kvstore[key] = data
resp, _ := json.Marshal(map[string]interface{}{
"revision": 1,
"success": true,
})
return hostservices.ServiceResultPass(200, "", resp), nil
}

func (k *InmemKeyValue) handleDelete(
metadata map[string]string,
) (hostservices.ServiceResult, error) {

key := metadata[agentapi.KeyValueKeyHeader]
if key == "" {
return hostservices.ServiceResultFail(400, "key is required"), nil
}

delete(k.kvstore, key)

resp, _ := json.Marshal(map[string]interface{}{
"success": true,
})
return hostservices.ServiceResultPass(200, "", resp), nil
}

func (k *InmemKeyValue) handleKeys() (hostservices.ServiceResult, error) {
keys := make([]string, 0)
for key := range k.kvstore {
keys = append(keys, key)
}

resp, _ := json.Marshal(keys)

return hostservices.ServiceResultPass(200, "", resp), nil
}
143 changes: 143 additions & 0 deletions host-services/inmem/objectstore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package inmem

import (
"crypto/sha256"
"encoding/base64"
"encoding/json"
"fmt"
"log/slog"

"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
hostservices "github.com/synadia-io/nex/host-services"
agentapi "github.com/synadia-io/nex/internal/agent-api"
)

const (
objectStoreServiceMethodGet = "get"
objectStoreServiceMethodPut = "put"
objectStoreServiceMethodDelete = "delete"
objectStoreServiceMethodList = "list"
)

type InmemObjectStore struct {
log *slog.Logger
objects map[string][]byte
}

func NewInmemObjectStore(log *slog.Logger) *InmemObjectStore {
return &InmemObjectStore{
objects: make(map[string][]byte),
log: log,
}
}

func (o *InmemObjectStore) Initialize(_ json.RawMessage) error {
return nil
}

func (o *InmemObjectStore) HandleRequest(
conns map[string]*nats.Conn,
namespace string,
workloadId string,
method string,
workloadName string,
metadata map[string]string,
request []byte) (hostservices.ServiceResult, error) {

switch method {
case objectStoreServiceMethodGet:
return o.handleGet(metadata)
case objectStoreServiceMethodPut:
return o.handlePut(request, metadata)
case objectStoreServiceMethodDelete:
return o.handleDelete(metadata)
case objectStoreServiceMethodList:
return o.handleList()
default:
o.log.Warn("Received invalid host services RPC request",
slog.String("service", "objectstore"),
slog.String("method", method),
)
return hostservices.ServiceResultFail(400, "unknown method"), nil
}
}

func (o *InmemObjectStore) handleGet(metadata map[string]string) (hostservices.ServiceResult, error) {
name := metadata[agentapi.ObjectStoreObjectNameHeader]
if name == "" {
return hostservices.ServiceResultFail(400, "name is required"), nil
}

if object, ok := o.objects[name]; ok {
return hostservices.ServiceResultPass(200, "", object), nil
}

return hostservices.ServiceResultFail(404, "no such object"), nil
}

func (o *InmemObjectStore) handlePut(request []byte, metadata map[string]string) (hostservices.ServiceResult, error) {
name := metadata[agentapi.ObjectStoreObjectNameHeader]
if name == "" {
return hostservices.ServiceResultFail(400, "name is required"), nil
}

o.objects[name] = request

res := jetstream.ObjectInfo{
ObjectMeta: jetstream.ObjectMeta{
Name: name,
Description: "In-memory object",
},
Bucket: "bucket",
Size: uint64(len(request)),
Digest: makeDigest(request),
}
output, _ := json.Marshal(res)

return hostservices.ServiceResultPass(200, "", output), nil
}

func (o *InmemObjectStore) handleDelete(
metadata map[string]string,
) (hostservices.ServiceResult, error) {

name := metadata[agentapi.ObjectStoreObjectNameHeader]
if name == "" {
return hostservices.ServiceResultFail(400, "name is required"), nil
}

delete(o.objects, name)

resp, _ := json.Marshal(&agentapi.HostServicesObjectStoreResponse{
Success: true,
})
return hostservices.ServiceResultPass(200, "", resp), nil
}

func (o *InmemObjectStore) handleList() (hostservices.ServiceResult, error) {
output := make([]jetstream.ObjectInfo, 0)

for k, v := range o.objects {
output = append(output, jetstream.ObjectInfo{
ObjectMeta: jetstream.ObjectMeta{
Name: k,
Description: "In-memory object",
},
Bucket: "bucket",
Size: uint64(len(v)),
Digest: makeDigest(v),
})
}
resp, _ := json.Marshal(output)
return hostservices.ServiceResultPass(200, "", resp), nil
}

// NOTE : digest comes back from NATS as a sha-256 hash base64-encoded in the form SHA-256={hash}
func makeDigest(input []byte) string {
s := sha256.New()
s.Write(input)
hs := s.Sum(nil)

return fmt.Sprintf("SHA-256=%s", base64.URLEncoding.EncodeToString(hs))
}
46 changes: 46 additions & 0 deletions test/inmem_hostservices/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package main

import (
"context"
"log/slog"

"disorder.dev/shandler"
"github.com/nats-io/nats.go"
hostservices "github.com/synadia-io/nex/host-services"
"github.com/synadia-io/nex/host-services/builtins"
"github.com/synadia-io/nex/host-services/inmem"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace/noop"
)

func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nc, err := nats.Connect("0.0.0.0:4222")
if err != nil {
panic(err)
}

noopProvider := noop.NewTracerProvider()
otel.SetTracerProvider(noopProvider)
t := otel.Tracer("")

var handlerOpts []shandler.HandlerOption
handlerOpts = append(handlerOpts, shandler.WithLogLevel(slog.LevelDebug))
log := slog.New(shandler.NewHandler(handlerOpts...))

hsServer := hostservices.NewHostServicesServer(nc, log, t)
http, _ := builtins.NewHTTPService(log)
messaging, _ := builtins.NewMessagingService(log)
kv := inmem.NewInmemKeyValueService(log)
obj := inmem.NewInmemObjectStore(log)

hsServer.AddService("http", http, []byte{})

Check failure on line 38 in test/inmem_hostservices/main.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `hsServer.AddService` is not checked (errcheck)
hsServer.AddService("messaging", messaging, []byte{})

Check failure on line 39 in test/inmem_hostservices/main.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `hsServer.AddService` is not checked (errcheck)
hsServer.AddService("kv", kv, []byte{})

Check failure on line 40 in test/inmem_hostservices/main.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `hsServer.AddService` is not checked (errcheck)
hsServer.AddService("objectstore", obj, []byte{})

_ = hsServer.Start()

<-ctx.Done()
}

0 comments on commit 3d22236

Please sign in to comment.