Skip to content

Commit

Permalink
Per-workload host services connections (#265)
Browse files Browse the repository at this point in the history
  • Loading branch information
autodidaddict committed Jun 10, 2024
1 parent acaa84f commit 175763d
Show file tree
Hide file tree
Showing 18 changed files with 265 additions and 163 deletions.
67 changes: 42 additions & 25 deletions control-api/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,18 @@ type DeployRequest struct {
RetryCount *uint `json:"retry_count,omitempty"`
RetriedAt *time.Time `json:"retried_at,omitempty"`

HostServicesConfig *HostServicesConfiguration `json:"host_services,omitempty"`

WorkloadEnvironment map[string]string `json:"-"`
DecodedClaims jwt.GenericClaims `json:"-"`
}

type HostServicesConfiguration struct {
NatsUrl string `json:"nats_url"`
NatsUserJwt string `json:"nats_user_jwt"`
NatsUserSeed string `json:"nats_user_seed"`
}

var (
validWorkloadName = regexp.MustCompile(`^[a-z]+$`)
)
Expand Down Expand Up @@ -67,17 +75,18 @@ func NewDeployRequest(opts ...RequestOption) (*DeployRequest, error) {
senderPublic, _ := reqOpts.senderXkey.PublicKey()

req := &DeployRequest{
Argv: reqOpts.argv,
Description: &reqOpts.workloadDescription,
WorkloadType: reqOpts.workloadType,
Location: &reqOpts.location,
WorkloadJwt: &workloadJwt,
Environment: &encryptedEnv,
Essential: &reqOpts.essential,
SenderPublicKey: &senderPublic,
TargetNode: &reqOpts.targetNode,
TriggerSubjects: reqOpts.triggerSubjects,
JsDomain: &reqOpts.jsDomain,
Argv: reqOpts.argv,
Description: &reqOpts.workloadDescription,
WorkloadType: reqOpts.workloadType,
Location: &reqOpts.location,
WorkloadJwt: &workloadJwt,
Environment: &encryptedEnv,
Essential: &reqOpts.essential,
SenderPublicKey: &senderPublic,
TargetNode: &reqOpts.targetNode,
TriggerSubjects: reqOpts.triggerSubjects,
JsDomain: &reqOpts.jsDomain,
HostServicesConfig: reqOpts.hostServicesConfiguration,
}

return req, nil
Expand Down Expand Up @@ -143,20 +152,21 @@ func (request *DeployRequest) DecryptRequestEnvironment(recipientXKey nkeys.KeyP
}

type requestOptions struct {
argv []string
workloadName string
workloadType NexWorkload
workloadDescription string
location url.URL
env map[string]string
essential bool
senderXkey nkeys.KeyPair
claimsIssuer nkeys.KeyPair
targetPublicXKey string
jsDomain string
hash string
targetNode string
triggerSubjects []string
argv []string
workloadName string
workloadType NexWorkload
workloadDescription string
location url.URL
env map[string]string
essential bool
senderXkey nkeys.KeyPair
claimsIssuer nkeys.KeyPair
targetPublicXKey string
jsDomain string
hash string
targetNode string
triggerSubjects []string
hostServicesConfiguration *HostServicesConfiguration
}

type RequestOption func(o requestOptions) requestOptions
Expand All @@ -169,6 +179,13 @@ func Argv(argv []string) RequestOption {
}
}

func HostServicesConfig(config HostServicesConfiguration) RequestOption {
return func(o requestOptions) requestOptions {
o.hostServicesConfiguration = &config
return o
}
}

// Name of the workload. Conforms to the same name rules as the services API
func WorkloadName(name string) RequestOption {
return func(o requestOptions) requestOptions {
Expand Down
9 changes: 6 additions & 3 deletions host-services/builtins/builtins_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ func TestKvBuiltin(t *testing.T) {
server := hostservices.NewHostServicesServer(nc, slog.Default(), noop.NewTracerProvider().Tracer("nex-node"))
client := hostservices.NewHostServicesClient(nc, 2*time.Second, testNamespace, testWorkload, testWorkloadId)
bClient := NewBuiltinServicesClient(client)
server.SetHostServicesConnection(testWorkloadId, nc)

service, _ := NewKeyValueService(nc, slog.Default())
service, _ := NewKeyValueService(slog.Default())
err := server.AddService("kv", service, nil)
if err != nil {
t.Fatalf("Failed to add service: %s", err)
Expand Down Expand Up @@ -84,8 +85,9 @@ func TestMessagingBuiltin(t *testing.T) {
server := hostservices.NewHostServicesServer(nc, slog.Default(), noop.NewTracerProvider().Tracer("nex-node"))
client := hostservices.NewHostServicesClient(nc, 2*time.Second, testNamespace, testWorkload, testWorkloadId)
bClient := NewBuiltinServicesClient(client)
server.SetHostServicesConnection(testWorkloadId, nc)

service, _ := NewMessagingService(nc, slog.Default())
service, _ := NewMessagingService(slog.Default())
_ = server.AddService("messaging", service, nil)
_ = server.Start()

Expand All @@ -110,8 +112,9 @@ func TestObjectBuiltin(t *testing.T) {
server := hostservices.NewHostServicesServer(nc, slog.Default(), noop.NewTracerProvider().Tracer("nex-node"))
client := hostservices.NewHostServicesClient(nc, 2*time.Second, testNamespace, testWorkload, testWorkloadId)
bClient := NewBuiltinServicesClient(client)
server.SetHostServicesConnection(testWorkloadId, nc)

service, _ := NewObjectStoreService(nc, slog.Default())
service, _ := NewObjectStoreService(slog.Default())
_ = server.AddService("objectstore", service, []byte{})
_ = server.Start()

Expand Down
8 changes: 4 additions & 4 deletions host-services/builtins/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,11 @@ const defaultHTTPRequestTimeoutMillis = 2500

type HTTPService struct {
log *slog.Logger
nc *nats.Conn
}

func NewHTTPService(nc *nats.Conn, log *slog.Logger) (*HTTPService, error) {
func NewHTTPService(log *slog.Logger) (*HTTPService, error) {
http := &HTTPService{
log: log,
nc: nc,
}

return http, nil
Expand All @@ -39,7 +37,9 @@ func (h *HTTPService) Initialize(_ json.RawMessage) error {
return nil
}

func (h *HTTPService) HandleRequest(namespace string,
func (h *HTTPService) HandleRequest(
_ *nats.Conn,
namespace string,
workloadId string,
method string,
workloadName string,
Expand Down
38 changes: 22 additions & 16 deletions host-services/builtins/keyvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ const kvServiceMethodKeys = "keys"

type KeyValueService struct {
log *slog.Logger
nc *nats.Conn
config kvConfig
}

Expand All @@ -29,10 +28,9 @@ type kvConfig struct {
JitProvision bool `json:"jit_provision"`
}

func NewKeyValueService(nc *nats.Conn, log *slog.Logger) (*KeyValueService, error) {
func NewKeyValueService(log *slog.Logger) (*KeyValueService, error) {
kv := &KeyValueService{
log: log,
nc: nc,
}

return kv, nil
Expand All @@ -55,6 +53,7 @@ func (k *KeyValueService) Initialize(config json.RawMessage) error {
}

func (k *KeyValueService) HandleRequest(
nc *nats.Conn,
namespace string,
workloadId string,
method string,
Expand All @@ -64,13 +63,13 @@ func (k *KeyValueService) HandleRequest(
) (hostservices.ServiceResult, error) {
switch method {
case kvServiceMethodGet:
return k.handleGet(workloadId, workloadName, request, metadata, namespace)
return k.handleGet(nc, workloadId, workloadName, request, metadata, namespace)
case kvServiceMethodSet:
return k.handleSet(workloadId, workloadName, request, metadata, namespace)
return k.handleSet(nc, workloadId, workloadName, request, metadata, namespace)
case kvServiceMethodDelete:
return k.handleDelete(workloadId, workloadName, request, metadata, namespace)
return k.handleDelete(nc, workloadId, workloadName, request, metadata, namespace)
case kvServiceMethodKeys:
return k.handleKeys(workloadId, workloadName, request, metadata, namespace)
return k.handleKeys(nc, workloadId, workloadName, request, metadata, namespace)
default:
k.log.Warn("Received invalid host services RPC request",
slog.String("service", "kv"),
Expand All @@ -81,12 +80,13 @@ func (k *KeyValueService) HandleRequest(
}

func (k *KeyValueService) handleGet(
nc *nats.Conn,
_, workload string,
_ []byte, metadata map[string]string,
namespace string,
) (hostservices.ServiceResult, error) {

kvStore, err := k.resolveKeyValueStore(namespace, workload)
kvStore, err := k.resolveKeyValueStore(nc, namespace, workload)
if err != nil {
k.log.Error(fmt.Sprintf("failed to resolve key/value store: %s", err.Error()))
return hostservices.ServiceResultFail(500, "could not resolve k/v store"), nil
Expand All @@ -106,11 +106,13 @@ func (k *KeyValueService) handleGet(
return hostservices.ServiceResultPass(200, "", entry.Value()), nil
}

func (k *KeyValueService) handleSet(_, workload string,
func (k *KeyValueService) handleSet(
nc *nats.Conn,
_, workload string,
data []byte, metadata map[string]string,
namespace string) (hostservices.ServiceResult, error) {

kvStore, err := k.resolveKeyValueStore(namespace, workload)
kvStore, err := k.resolveKeyValueStore(nc, namespace, workload)
if err != nil {
k.log.Error(fmt.Sprintf("failed to resolve key/value store: %s", err.Error()))
return hostservices.ServiceResultFail(500, "could not resolve k/v store"), nil
Expand All @@ -134,11 +136,13 @@ func (k *KeyValueService) handleSet(_, workload string,
return hostservices.ServiceResultPass(200, "", resp), nil
}

func (k *KeyValueService) handleDelete(_, workload string,
func (k *KeyValueService) handleDelete(
nc *nats.Conn,
_, workload string,
_ []byte, metadata map[string]string,
namespace string) (hostservices.ServiceResult, error) {

kvStore, err := k.resolveKeyValueStore(namespace, workload)
kvStore, err := k.resolveKeyValueStore(nc, namespace, workload)
if err != nil {
k.log.Error(fmt.Sprintf("failed to resolve key/value store: %s", err.Error()))
return hostservices.ServiceResultFail(500, "could not resolve k/v store"), nil
Expand All @@ -161,11 +165,13 @@ func (k *KeyValueService) handleDelete(_, workload string,
return hostservices.ServiceResultPass(200, "", resp), nil
}

func (k *KeyValueService) handleKeys(_, workload string,
func (k *KeyValueService) handleKeys(
nc *nats.Conn,
_, workload string,
_ []byte, _ map[string]string,
namespace string) (hostservices.ServiceResult, error) {

kvStore, err := k.resolveKeyValueStore(namespace, workload)
kvStore, err := k.resolveKeyValueStore(nc, namespace, workload)
if err != nil {
k.log.Warn(fmt.Sprintf("failed to resolve key/value store: %s", err.Error()))
return hostservices.ServiceResultFail(500, "could not resolve k/v store"), nil
Expand All @@ -184,8 +190,8 @@ func (k *KeyValueService) handleKeys(_, workload string,
}

// resolve the key value store for this workload; initialize it if necessary
func (k *KeyValueService) resolveKeyValueStore(namespace, workload string) (nats.KeyValue, error) {
js, err := k.nc.JetStream()
func (k *KeyValueService) resolveKeyValueStore(nc *nats.Conn, namespace, workload string) (nats.KeyValue, error) {
js, err := nc.JetStream()
if err != nil {
return nil, err
}
Expand Down
30 changes: 18 additions & 12 deletions host-services/builtins/messaging.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ const (

type MessagingService struct {
log *slog.Logger
nc *nats.Conn

config messagingConfig
}
Expand All @@ -32,10 +31,9 @@ type messagingConfig struct {
RequestManyTimeoutMs int64 `json:"request_many_timeout_ms"`
}

func NewMessagingService(nc *nats.Conn, log *slog.Logger) (*MessagingService, error) {
func NewMessagingService(log *slog.Logger) (*MessagingService, error) {
messaging := &MessagingService{
log: log,
nc: nc,
}

return messaging, nil
Expand All @@ -56,7 +54,9 @@ func (m *MessagingService) Initialize(config json.RawMessage) error {
return nil
}

func (m *MessagingService) HandleRequest(namespace string,
func (m *MessagingService) HandleRequest(
nc *nats.Conn,
namespace string,
workloadId string,
method string,
workloadName string,
Expand All @@ -65,11 +65,11 @@ func (m *MessagingService) HandleRequest(namespace string,

switch method {
case messagingServiceMethodPublish:
return m.handlePublish(workloadId, workloadName, request, metadata, namespace)
return m.handlePublish(nc, workloadId, workloadName, request, metadata, namespace)
case messagingServiceMethodRequest:
return m.handleRequest(workloadId, workloadName, request, metadata, namespace)
return m.handleRequest(nc, workloadId, workloadName, request, metadata, namespace)
case messagingServiceMethodRequestMany:
return m.handleRequestMany(workloadId, workloadName, request, metadata, namespace)
return m.handleRequestMany(nc, workloadId, workloadName, request, metadata, namespace)
default:
m.log.Warn("Received invalid host services RPC request",
slog.String("service", "messaging"),
Expand All @@ -79,7 +79,9 @@ func (m *MessagingService) HandleRequest(namespace string,
}
}

func (m *MessagingService) handlePublish(_, _ string,
func (m *MessagingService) handlePublish(
nc *nats.Conn,
_, _ string,
data []byte, metadata map[string]string,
_ string,
) (hostservices.ServiceResult, error) {
Expand All @@ -88,7 +90,7 @@ func (m *MessagingService) handlePublish(_, _ string,
return hostservices.ServiceResultFail(500, "subject is required"), nil
}

err := m.nc.Publish(subject, data)
err := nc.Publish(subject, data)
if err != nil {
m.log.Warn(fmt.Sprintf("failed to publish %d-byte message on subject %s: %s", len(data), subject, err.Error()))
return hostservices.ServiceResultFail(500, "failed to publish message"), nil
Expand All @@ -100,7 +102,9 @@ func (m *MessagingService) handlePublish(_, _ string,
return hostservices.ServiceResultPass(200, "", resp), nil
}

func (m *MessagingService) handleRequest(_, _ string,
func (m *MessagingService) handleRequest(
nc *nats.Conn,
_, _ string,
data []byte, metadata map[string]string,
_ string,
) (hostservices.ServiceResult, error) {
Expand All @@ -109,7 +113,7 @@ func (m *MessagingService) handleRequest(_, _ string,
return hostservices.ServiceResultFail(400, "subject is required"), nil
}

resp, err := m.nc.Request(subject, data, time.Duration(m.config.RequestTimeoutMs*int64(time.Millisecond)))
resp, err := nc.Request(subject, data, time.Duration(m.config.RequestTimeoutMs*int64(time.Millisecond)))
if err != nil {
m.log.Debug(fmt.Sprintf("failed to send %d-byte request on subject %s: %s", len(data), subject, err.Error()))
return hostservices.ServiceResultFail(500, "failed to send request"), nil
Expand All @@ -119,7 +123,9 @@ func (m *MessagingService) handleRequest(_, _ string,
return hostservices.ServiceResultPass(200, "", resp.Data), nil
}

func (m *MessagingService) handleRequestMany(_, _ string,
func (m *MessagingService) handleRequestMany(
_ *nats.Conn,
_, _ string,
_ []byte, metadata map[string]string,
_ string,
) (hostservices.ServiceResult, error) {
Expand Down
Loading

0 comments on commit 175763d

Please sign in to comment.