diff --git a/agent/providers/lib/v8.go b/agent/providers/lib/v8.go index 97569300..0913e167 100644 --- a/agent/providers/lib/v8.go +++ b/agent/providers/lib/v8.go @@ -15,17 +15,38 @@ import ( ) const ( - hostServicesObjectName = "hostServices" + hostServicesObjectName = "hostServices" + hostServicesKVObjectName = "kv" hostServicesKVGetFunctionName = "get" hostServicesKVSetFunctionName = "set" hostServicesKVDeleteFunctionName = "delete" hostServicesKVKeysFunctionName = "keys" + hostServicesKVGetTimeout = time.Millisecond * 250 + hostServicesKVSetTimeout = time.Millisecond * 250 + hostServicesKVDeleteTimeout = time.Millisecond * 250 + hostServicesKVKeysTimeout = time.Millisecond * 250 + + hostServicesMessagingObjectName = "messaging" + hostServicesMessagingPublishFunctionName = "publish" + hostServicesMessagingRequestFunctionName = "request" + hostServicesMessagingRequestManyFunctionName = "requestMany" + + hostServicesMessagingPublishTimeout = time.Millisecond * 500 + hostServicesMessagingRequestTimeout = time.Millisecond * 500 + hostServicesMessagingRequestManyTimeout = time.Millisecond * 3000 + nexTriggerSubject = "x-nex-trigger-subject" nexRuntimeNs = "x-nex-runtime-ns" - v8MaxFileSizeBytes = int64(12288) // arbitrarily ~12K, for now + messageSubject = "x-subject" + + v8FunctionArrayAppend = "array-append" + v8FunctionArrayInit = "array-init" + + v8ExecutionTimeoutMillis = 5000 + v8MaxFileSizeBytes = int64(12288) // arbitrarily ~12K, for now ) // V8 execution provider implementation @@ -46,9 +67,10 @@ type V8 struct { nc *nats.Conn // agent NATS connection - ctx *v8.Context - iso *v8.Isolate - ubs *v8.UnboundScript + ctx *v8.Context // default context for internal use only + iso *v8.Isolate + ubs *v8.UnboundScript + utils map[string]*v8.Function //v8.UnboundScript } // Deploy expects a `Validate` to have succeeded and `ubs` to be non-nil @@ -94,13 +116,16 @@ func (v *V8) Execute(subject string, payload []byte) ([]byte, error) { return nil, fmt.Errorf("invalid state for execution; no compiled code available for vm: %s", v.name) } - var err error + ctx, err := v.newV8Context() + if err != nil { + return nil, fmt.Errorf("failed to initialize context in vm: %s", err.Error()) + } vals := make(chan *v8.Value, 1) errs := make(chan error, 1) go func() { - val, err := v.ubs.Run(v.ctx) + val, err := v.ubs.Run(ctx) if err != nil { errs <- err return @@ -112,19 +137,19 @@ func (v *V8) Execute(subject string, payload []byte) ([]byte, error) { return } - argv1, err := v8.NewValue(v.ctx.Isolate(), subject) + argv1, err := v8.NewValue(ctx.Isolate(), subject) if err != nil { errs <- err return } - argv2, err := v8.NewValue(v.ctx.Isolate(), string(payload)) + argv2, err := v8.NewValue(ctx.Isolate(), string(payload)) if err != nil { errs <- err return } - val, err = fn.Call(v.ctx.Global(), argv1, argv2) + val, err = fn.Call(ctx.Global(), argv1, argv2) if err != nil { errs <- err return @@ -141,15 +166,14 @@ func (v *V8) Execute(subject string, payload []byte) ([]byte, error) { } return retval, nil case err := <-errs: + _, _ = v.stderr.Write([]byte(fmt.Sprintf("v8 execution failed with error: %s", err.Error()))) return nil, err - case <-time.After(time.Millisecond * agentapi.DefaultRunloopSleepTimeoutMillis): - if err != nil { - // TODO-- check for v8.JSError as this type has Message, Location and StackTrace we can log... - return nil, fmt.Errorf("failed to invoke default export: %s", err) - } - } + case <-time.After(time.Millisecond * v8ExecutionTimeoutMillis): + // if err != nil { + // } - return nil, nil + return nil, fmt.Errorf("v8 execution timed out after %dms", v8ExecutionTimeoutMillis) + } } func (v *V8) Undeploy() error { @@ -165,13 +189,10 @@ func (v *V8) Validate() error { } if v.ctx != nil { - return fmt.Errorf("invalid state for validation; v8 context already initialized for vm: %s", v.name) + return fmt.Errorf("invalid state for validation; default v8 context already initialized for vm: %s", v.name) } - err := v.initV8Context() - if err != nil { - return fmt.Errorf("failed to initialize v8 context: %s", err) - } + v.ctx = v8.NewContext(v.iso) // default context for internal use only f, err := os.Open(v.tmpFilename) if err != nil { @@ -193,46 +214,76 @@ func (v *V8) Validate() error { return fmt.Errorf("failed to open source for validation: %s", err) } - v.ubs, err = v.ctx.Isolate().CompileUnboundScript(string(src), v.tmpFilename, v8.CompileOptions{}) + v.ubs, err = v.iso.CompileUnboundScript(string(src), v.tmpFilename, v8.CompileOptions{}) if err != nil { return fmt.Errorf("failed to compile source for execution: %s", err) } + // FIXME-- move this somewhere cleaner + append, _ := v.iso.CompileUnboundScript("(arr, value) => { arr.push(value); return arr; };", "array-append.js", v8.CompileOptions{}) + appendval, _ := append.Run(v.ctx) + appendfn, _ := appendval.AsFunction() + v.utils[v8FunctionArrayAppend] = appendfn + + // FIXME-- move this somewhere cleaner + init, _ := v.iso.CompileUnboundScript("() => { let arr = []; return arr; };", "array-init.js", v8.CompileOptions{}) + initval, _ := init.Run(v.ctx) + initfn, _ := initval.AsFunction() + v.utils[v8FunctionArrayInit] = initfn + return nil } -func (v *V8) initV8Context() error { +func (v *V8) newV8Context() (*v8.Context, error) { global := v8.NewObjectTemplate(v.iso) hostServices, err := v.newHostServicesTemplate() if err != nil { - return err + return nil, err } err = global.Set(hostServicesObjectName, hostServices) if err != nil { - return err + return nil, err } - v.ctx = v8.NewContext(v.iso, global) - - return nil + return v8.NewContext(v.iso, global), nil } -// agentint.{vmID}.rpc.{namespace}.{workload}.{service}.{method} +// agentint.{vmID}.rpc.{namespace}.{workload}.kv.{method} func (v *V8) keyValueServiceSubject(method string) string { return fmt.Sprintf("agentint.%s.rpc.%s.%s.kv.%s", v.vmID, v.namespace, v.name, method) } +// agentint.{vmID}.rpc.{namespace}.{workload}.messaging.{method} +func (v *V8) messagingServiceSubject(method string) string { + return fmt.Sprintf("agentint.%s.rpc.%s.%s.messaging.%s", v.vmID, v.namespace, v.name, method) +} + func (v *V8) newHostServicesTemplate() (*v8.ObjectTemplate, error) { hostServices := v8.NewObjectTemplate(v.iso) + + err := hostServices.Set(hostServicesKVObjectName, v.newKeyValueObjectTemplate()) + if err != nil { + return nil, err + } + + err = hostServices.Set(hostServicesMessagingObjectName, v.newMessagingObjectTemplate()) + if err != nil { + return nil, err + } + + return hostServices, nil +} + +func (v *V8) newKeyValueObjectTemplate() *v8.ObjectTemplate { kv := v8.NewObjectTemplate(v.iso) _ = kv.Set(hostServicesKVGetFunctionName, v8.NewFunctionTemplate(v.iso, func(info *v8.FunctionCallbackInfo) *v8.Value { args := info.Args() if len(args) != 1 { - val, _ := v8.NewValue(v.iso, false) - return val + val, _ := v8.NewValue(v.iso, "key is required") + return v.iso.ThrowException(val) } key := args[0].String() @@ -241,26 +292,23 @@ func (v *V8) newHostServicesTemplate() (*v8.ObjectTemplate, error) { Key: &key, }) - resp, err := v.nc.Request(v.keyValueServiceSubject(hostServicesKVGetFunctionName), req, time.Millisecond*250) + resp, err := v.nc.Request(v.keyValueServiceSubject(hostServicesKVGetFunctionName), req, hostServicesKVGetTimeout) if err != nil { val, _ := v8.NewValue(v.iso, err.Error()) - _ = v.iso.ThrowException(val) - return nil + return v.iso.ThrowException(val) } var kvresp *agentapi.HostServicesKeyValueRequest err = json.Unmarshal(resp.Data, &kvresp) if err != nil { val, _ := v8.NewValue(v.iso, err.Error()) - _ = v.iso.ThrowException(val) - return nil + return v.iso.ThrowException(val) } val, err := v8.JSONParse(v.ctx, string(*kvresp.Value)) if err != nil { val, _ := v8.NewValue(v.iso, err.Error()) - _ = v.iso.ThrowException(val) - return nil + return v.iso.ThrowException(val) } return val @@ -269,8 +317,8 @@ func (v *V8) newHostServicesTemplate() (*v8.ObjectTemplate, error) { _ = kv.Set(hostServicesKVSetFunctionName, v8.NewFunctionTemplate(v.iso, func(info *v8.FunctionCallbackInfo) *v8.Value { args := info.Args() if len(args) != 2 { - val, _ := v8.NewValue(v.iso, false) - return val + val, _ := v8.NewValue(v.iso, "key and value are required") + return v.iso.ThrowException(val) } key := args[0].String() @@ -279,8 +327,7 @@ func (v *V8) newHostServicesTemplate() (*v8.ObjectTemplate, error) { raw, err := value.MarshalJSON() if err != nil { val, _ := v8.NewValue(v.iso, err.Error()) - _ = v.iso.ThrowException(val) - return nil + return v.iso.ThrowException(val) } val := json.RawMessage(raw) @@ -289,25 +336,22 @@ func (v *V8) newHostServicesTemplate() (*v8.ObjectTemplate, error) { Value: &val, }) - resp, err := v.nc.Request(v.keyValueServiceSubject(hostServicesKVSetFunctionName), req, time.Millisecond*250) + resp, err := v.nc.Request(v.keyValueServiceSubject(hostServicesKVSetFunctionName), req, hostServicesKVSetTimeout) if err != nil { val, _ := v8.NewValue(v.iso, err.Error()) - _ = v.iso.ThrowException(val) - return nil + return v.iso.ThrowException(val) } var kvresp *agentapi.HostServicesKeyValueRequest err = json.Unmarshal(resp.Data, &kvresp) if err != nil { val, _ := v8.NewValue(v.iso, err.Error()) - _ = v.iso.ThrowException(val) - return nil + return v.iso.ThrowException(val) } if !*kvresp.Success { val, _ := v8.NewValue(v.iso, fmt.Sprintf("failed to set %d-byte value for key: %s", len(val), key)) - _ = v.iso.ThrowException(val) - return nil + return v.iso.ThrowException(val) } return nil @@ -316,8 +360,8 @@ func (v *V8) newHostServicesTemplate() (*v8.ObjectTemplate, error) { _ = kv.Set(hostServicesKVDeleteFunctionName, v8.NewFunctionTemplate(v.iso, func(info *v8.FunctionCallbackInfo) *v8.Value { args := info.Args() if len(args) != 1 { - val, _ := v8.NewValue(v.iso, false) - return val + val, _ := v8.NewValue(v.iso, "key is required") + return v.iso.ThrowException(val) } key := args[0].String() @@ -326,25 +370,22 @@ func (v *V8) newHostServicesTemplate() (*v8.ObjectTemplate, error) { Key: &key, }) - resp, err := v.nc.Request(v.keyValueServiceSubject(hostServicesKVDeleteFunctionName), req, time.Millisecond*250) + resp, err := v.nc.Request(v.keyValueServiceSubject(hostServicesKVDeleteFunctionName), req, hostServicesKVDeleteTimeout) if err != nil { val, _ := v8.NewValue(v.iso, err.Error()) - _ = v.iso.ThrowException(val) - return nil + return v.iso.ThrowException(val) } var kvresp *agentapi.HostServicesKeyValueRequest err = json.Unmarshal(resp.Data, &kvresp) if err != nil { val, _ := v8.NewValue(v.iso, err.Error()) - _ = v.iso.ThrowException(val) - return nil + return v.iso.ThrowException(val) } if !*kvresp.Success { val, _ := v8.NewValue(v.iso, fmt.Sprintf("failed to delete key: %s", key)) - _ = v.iso.ThrowException(val) - return nil + return v.iso.ThrowException(val) } return nil @@ -353,29 +394,168 @@ func (v *V8) newHostServicesTemplate() (*v8.ObjectTemplate, error) { _ = kv.Set(hostServicesKVKeysFunctionName, v8.NewFunctionTemplate(v.iso, func(info *v8.FunctionCallbackInfo) *v8.Value { req, _ := json.Marshal(map[string]interface{}{}) - resp, err := v.nc.Request(v.keyValueServiceSubject(hostServicesKVKeysFunctionName), req, time.Millisecond*250) + resp, err := v.nc.Request(v.keyValueServiceSubject(hostServicesKVKeysFunctionName), req, hostServicesKVKeysTimeout) if err != nil { val, _ := v8.NewValue(v.iso, err.Error()) - _ = v.iso.ThrowException(val) - return nil + return v.iso.ThrowException(val) } val, err := v8.JSONParse(v.ctx, string(resp.Data)) if err != nil { val, _ := v8.NewValue(v.iso, err.Error()) - _ = v.iso.ThrowException(val) - return nil + return v.iso.ThrowException(val) } return val })) - err := hostServices.Set(hostServicesKVObjectName, kv) - if err != nil { - return nil, err - } + return kv +} - return hostServices, nil +func (v *V8) newMessagingObjectTemplate() *v8.ObjectTemplate { + messaging := v8.NewObjectTemplate(v.iso) + + _ = messaging.Set(hostServicesMessagingPublishFunctionName, v8.NewFunctionTemplate(v.iso, func(info *v8.FunctionCallbackInfo) *v8.Value { + args := info.Args() + if len(args) != 2 { + val, _ := v8.NewValue(v.iso, "subject and payload are required") + return v.iso.ThrowException(val) + } + + subject := args[0].String() + payload := args[1].String() + + msg := nats.NewMsg(v.messagingServiceSubject(hostServicesMessagingPublishFunctionName)) + msg.Header.Add(messageSubject, subject) + msg.Data = []byte(payload) + + resp, err := v.nc.RequestMsg(msg, hostServicesMessagingPublishTimeout) + if err != nil { + val, _ := v8.NewValue(v.iso, err.Error()) + return v.iso.ThrowException(val) + } + + var msgresp *agentapi.HostServicesMessagingResponse + err = json.Unmarshal(resp.Data, &msgresp) + if err == nil && len(msgresp.Errors) > 0 { + val, _ := v8.NewValue(v.iso, msgresp.Errors[0]) + return v.iso.ThrowException(val) + } + + return nil + })) + + _ = messaging.Set(hostServicesMessagingRequestFunctionName, v8.NewFunctionTemplate(v.iso, func(info *v8.FunctionCallbackInfo) *v8.Value { + args := info.Args() + if len(args) != 2 { + val, _ := v8.NewValue(v.iso, "subject and payload are required") + return v.iso.ThrowException(val) + } + + subject := args[0].String() + payload := args[1].String() + + msg := nats.NewMsg(v.messagingServiceSubject(hostServicesMessagingRequestFunctionName)) + msg.Header.Add(messageSubject, subject) + msg.Data = []byte(payload) + + resp, err := v.nc.RequestMsg(msg, hostServicesMessagingRequestTimeout) + if err != nil { + val, _ := v8.NewValue(v.iso, err.Error()) + return v.iso.ThrowException(val) + } + + var msgresp *agentapi.HostServicesMessagingResponse + err = json.Unmarshal(resp.Data, &msgresp) + if err == nil && len(msgresp.Errors) > 0 { + val, _ := v8.NewValue(v.iso, msgresp.Errors[0]) + return v.iso.ThrowException(val) + } + + val, err := v8.NewValue(v.iso, string(resp.Data)) // FIXME-- pass []byte natively into javascript using ArrayBuffer + if err != nil { + val, _ := v8.NewValue(v.iso, err.Error()) + return v.iso.ThrowException(val) + } + + return val + })) + + _ = messaging.Set(hostServicesMessagingRequestManyFunctionName, v8.NewFunctionTemplate(v.iso, func(info *v8.FunctionCallbackInfo) *v8.Value { + args := info.Args() + if len(args) != 2 { + val, _ := v8.NewValue(v.iso, "subject and payload are required") + return v.iso.ThrowException(val) + } + + subject := args[0].String() + payload := args[1].String() + + // construct the requestMany request message + msg := nats.NewMsg(v.messagingServiceSubject(hostServicesMessagingRequestManyFunctionName)) + msg.Header.Add(messageSubject, subject) + msg.Reply = v.nc.NewRespInbox() + msg.Data = []byte(payload) + + // create a synchronous subscription + sub, err := v.nc.SubscribeSync(msg.Reply) + if err != nil { + _, _ = v.stderr.Write([]byte(fmt.Sprintf("failed to subscribe sync: %s", err.Error()))) + + val, _ := v8.NewValue(v.iso, err.Error()) + return v.iso.ThrowException(val) + } + + defer func() { + _ = sub.Unsubscribe() + }() + + _ = v.nc.Flush() + + // publish the requestMany request to the target subject + err = v.nc.PublishMsg(msg) + if err != nil { + _, _ = v.stderr.Write([]byte(fmt.Sprintf("failed to publish message: %s", err.Error()))) + + val, _ := v8.NewValue(v.iso, err.Error()) + return v.iso.ThrowException(val) + } + + val, err := v.utils[v8FunctionArrayInit].Call(v.ctx.Global()) + if err != nil { + val, _ := v8.NewValue(v.iso, err.Error()) + return v.iso.ThrowException(val) + } + + start := time.Now() + for time.Since(start) < hostServicesMessagingRequestManyTimeout { + resp, err := sub.NextMsg(hostServicesMessagingRequestTimeout) + if err != nil && !errors.Is(err, nats.ErrTimeout) { + val, _ := v8.NewValue(v.iso, err.Error()) + return v.iso.ThrowException(val) + } + + if resp != nil { + _, _ = v.stdout.Write([]byte(fmt.Sprintf("received %d-byte response", len(resp.Data)))) + + respval, err := v8.NewValue(v.iso, string(resp.Data)) // FIXME-- pass []byte natively into javascript using ArrayBuffer + if err != nil { + val, _ := v8.NewValue(v.iso, err.Error()) + return v.iso.ThrowException(val) + } + + val, err = v.utils[v8FunctionArrayAppend].Call(v.ctx.Global(), val, respval) + if err != nil { + val, _ := v8.NewValue(v.iso, err.Error()) + return v.iso.ThrowException(val) + } + } + } + + return val + })) + + return messaging } // convenience method to initialize a V8 execution provider @@ -410,5 +590,7 @@ func InitNexExecutionProviderV8(params *agentapi.ExecutionProviderParams) (*V8, nc: params.NATSConn, ctx: nil, iso: v8.NewIsolate(), + + utils: make(map[string]*v8.Function), }, nil } diff --git a/examples/v8/echofunction/src/hostservices.js b/examples/v8/echofunction/src/kv.js similarity index 100% rename from examples/v8/echofunction/src/hostservices.js rename to examples/v8/echofunction/src/kv.js diff --git a/examples/v8/echofunction/src/messaging.js b/examples/v8/echofunction/src/messaging.js new file mode 100644 index 00000000..3d04f373 --- /dev/null +++ b/examples/v8/echofunction/src/messaging.js @@ -0,0 +1,28 @@ +(subject, payload) => { + this.hostServices.messaging.publish('hello.world', payload); + + var req; + var reqEx; + + var reqMany; + var reqManyEx; + + try { + req = this.hostServices.messaging.request('hello.world.request', payload); + } catch (e) { + reqEx = e; + } + + try { + reqMany = this.hostServices.messaging.requestMany('hello.world.request.many', payload); + } catch (e) { + reqManyEx = e; + } + + return { + 'hello.world.request': req, + 'hello.world.request.ex': reqEx, + 'hello.world.request.many': reqMany, + 'hello.world.request.many.ex': reqManyEx + } +}; diff --git a/internal/agent-api/types.go b/internal/agent-api/types.go index 6a54242a..0924d05d 100644 --- a/internal/agent-api/types.go +++ b/internal/agent-api/types.go @@ -152,6 +152,16 @@ type HostServicesKeyValueRequest struct { Success *bool `json:"success,omitempty"` } +type HostServicesMessagingRequest struct { + Subject *string `json:"key"` + Payload *json.RawMessage `json:"payload,omitempty"` +} + +type HostServicesMessagingResponse struct { + Errors []string `json:"errors,omitempty"` + Success bool `json:"success,omitempty"` +} + type MachineMetadata struct { VmID *string `json:"vmid"` NodeNatsHost *string `json:"node_nats_host"` diff --git a/internal/node/machine_mgr.go b/internal/node/machine_mgr.go index ce652e75..066078e6 100644 --- a/internal/node/machine_mgr.go +++ b/internal/node/machine_mgr.go @@ -587,16 +587,14 @@ func (m *MachineManager) generateTriggerHandler(vm *runningFirecracker, tsub str m.t.functionRunTimeNano.Add(m.ctx, runTimeNs64, metric.WithAttributes(attribute.String("namespace", vm.namespace))) m.t.functionRunTimeNano.Add(m.ctx, runTimeNs64, metric.WithAttributes(attribute.String("workload_name", *vm.deployRequest.WorkloadName))) - if len(resp.Data) > 0 { - err = msg.Respond(resp.Data) - if err != nil { - m.log.Error("Failed to respond to trigger subject subscription request for deployed workload", - slog.String("vmid", vm.vmmID), - slog.String("trigger_subject", tsub), - slog.String("workload_type", *request.WorkloadType), - slog.Any("err", err), - ) - } + err = msg.Respond(resp.Data) + if err != nil { + m.log.Error("Failed to respond to trigger subject subscription request for deployed workload", + slog.String("vmid", vm.vmmID), + slog.String("trigger_subject", tsub), + slog.String("workload_type", *request.WorkloadType), + slog.Any("err", err), + ) } } } diff --git a/internal/node/services/lib/messaging.go b/internal/node/services/lib/messaging.go index 663c2147..5fb0d7ef 100644 --- a/internal/node/services/lib/messaging.go +++ b/internal/node/services/lib/messaging.go @@ -1,16 +1,25 @@ package lib import ( + "encoding/json" + "errors" + "fmt" "log/slog" "strings" + "time" "github.com/nats-io/nats.go" + agentapi "github.com/synadia-io/nex/internal/agent-api" ) -// Messaging operations available: -// Publish -// Request -// RequestMany +const messagingServiceMethodPublish = "publish" +const messagingServiceMethodRequest = "request" +const messagingServiceMethodRequestMany = "requestMany" + +const messagingRequestTimeout = time.Millisecond * 500 // FIXME-- make timeout configurable per request? +const messagingRequestManyTimeout = time.Millisecond * 3000 + +const messageSubject = "x-subject" type MessagingService struct { log *slog.Logger @@ -42,12 +51,160 @@ func (m *MessagingService) HandleRPC(msg *nats.Msg) { method := tokens[6] switch method { + case messagingServiceMethodPublish: + m.handlePublish(msg) + case messagingServiceMethodRequest: + m.handleRequest(msg) + case messagingServiceMethodRequestMany: + m.handleRequestMany(msg) default: m.log.Warn("Received invalid host services RPC request", slog.String("service", service), slog.String("method", method), ) + } +} + +func (m *MessagingService) handlePublish(msg *nats.Msg) { + subject := msg.Header.Get(messageSubject) + if subject == "" { + resp, _ := json.Marshal(&agentapi.HostServicesMessagingResponse{ + Errors: []string{"subject is required"}, + }) + + err := msg.Respond(resp) + if err != nil { + m.log.Error(fmt.Sprintf("failed to respond to host services RPC request: %s", err.Error())) + } + return + } + + err := m.nc.Publish(subject, msg.Data) + if err != nil { + m.log.Warn(fmt.Sprintf("failed to publish %d-byte message on subject %s: %s", len(msg.Data), subject, err.Error())) + + resp, _ := json.Marshal(&agentapi.HostServicesMessagingResponse{ + Errors: []string{fmt.Sprintf("failed to publish %d-byte message on subject %s: %s", len(msg.Data), subject, err.Error())}, + }) + + err := msg.Respond(resp) + if err != nil { + m.log.Error(fmt.Sprintf("failed to respond to host services RPC request: %s", err.Error())) + } + return + } + + resp, _ := json.Marshal(&agentapi.HostServicesMessagingResponse{ + Success: true, + }) + err = msg.Respond(resp) + if err != nil { + m.log.Warn(fmt.Sprintf("failed to respond to messaging host service request: %s", err.Error())) + } +} + +func (m *MessagingService) handleRequest(msg *nats.Msg) { + subject := msg.Header.Get(messageSubject) + if subject == "" { + resp, _ := json.Marshal(&agentapi.HostServicesMessagingResponse{ + Errors: []string{"subject is required"}, + }) + + err := msg.Respond(resp) + if err != nil { + m.log.Error(fmt.Sprintf("failed to respond to host services RPC request: %s", err.Error())) + } + return + } + + resp, err := m.nc.Request(subject, msg.Data, messagingRequestTimeout) + if err != nil { + m.log.Debug(fmt.Sprintf("failed to send %d-byte request on subject %s: %s", len(msg.Data), subject, err.Error())) + + resp, _ := json.Marshal(&agentapi.HostServicesMessagingResponse{ + Errors: []string{fmt.Sprintf("failed to send %d-byte request on subject %s: %s", len(msg.Data), subject, err.Error())}, + }) + + err := msg.Respond(resp) + if err != nil { + m.log.Error(fmt.Sprintf("failed to respond to host services RPC request: %s", err.Error())) + } + return + } + + m.log.Debug(fmt.Sprintf("received %d-byte response to request on subject: %s", len(resp.Data), subject)) + + err = msg.Respond(resp.Data) + if err != nil { + m.log.Warn(fmt.Sprintf("failed to respond to messaging host service request: %s", err.Error())) + } +} + +func (m *MessagingService) handleRequestMany(msg *nats.Msg) { + subject := msg.Header.Get(messageSubject) + if subject == "" { + resp, _ := json.Marshal(&agentapi.HostServicesMessagingResponse{ + Errors: []string{"subject is required"}, + }) + + err := msg.Respond(resp) + if err != nil { + m.log.Error(fmt.Sprintf("failed to respond to host services RPC request: %s", err.Error())) + } + return + } + + // create a new response inbox and synchronous subscription + replyTo := m.nc.NewRespInbox() + sub, err := m.nc.SubscribeSync(replyTo) + if err != nil { + resp, _ := json.Marshal(&agentapi.HostServicesMessagingResponse{ + Errors: []string{fmt.Sprintf("failed to subscribe to response inbox: %s", err.Error())}, + }) + + err := msg.Respond(resp) + if err != nil { + m.log.Error(fmt.Sprintf("failed to respond to host services RPC request: %s", err.Error())) + } + return + } + + defer func() { + _ = sub.Unsubscribe() + }() + + _ = m.nc.Flush() + + // publish the original requestMany request to the target subject + err = m.nc.PublishRequest(subject, replyTo, msg.Data) + if err != nil { + resp, _ := json.Marshal(&agentapi.HostServicesMessagingResponse{ + Errors: []string{fmt.Sprintf("failed to send %d-byte request to subject: %s: %s", len(msg.Data), subject, err.Error())}, + }) + + err := msg.Respond(resp) + if err != nil { + m.log.Error(fmt.Sprintf("failed to respond to host services RPC request: %s", err.Error())) + } + + return + } + + start := time.Now() + for time.Since(start) < messagingRequestManyTimeout { + resp, err := sub.NextMsg(messagingRequestTimeout) + if err != nil && !errors.Is(err, nats.ErrTimeout) { + break + } + + if resp != nil { + m.log.Debug(fmt.Sprintf("received %d-byte response to request on subject: %s", len(resp.Data), subject)) - // msg.Respond() + // respond to the requestMany message + err = msg.Respond(resp.Data) + if err != nil { + m.log.Error(fmt.Sprintf("failed to publish %d-byte response to reply subject: %s: %s", len(resp.Data), msg.Reply, err.Error())) + } + } } } diff --git a/spec/node_test.go b/spec/node_test.go index 0ab1372b..9c1f1732 100644 --- a/spec/node_test.go +++ b/spec/node_test.go @@ -547,46 +547,133 @@ var _ = Describe("nex node", func() { }) Describe("host services", func() { - Context("when the javascript is valid", func() { - JustBeforeEach(func() { - triggerSubject = "hellohostservices" - deployRequest, err = newDeployRequest(*nodeID, "hostservices", "nex host services example", "../examples/v8/echofunction/src/hostservices.js", map[string]string{}, []string{triggerSubject}, log) - Expect(err).To(BeNil()) + Context("key value service", func() { + Context("when the javascript is valid", func() { + JustBeforeEach(func() { + triggerSubject = "hellokvservice" + deployRequest, err = newDeployRequest(*nodeID, "kvhostservice", "nex key value service example", "../examples/v8/echofunction/src/kv.js", map[string]string{}, []string{triggerSubject}, log) + Expect(err).To(BeNil()) - nodeClient := controlapi.NewApiClientWithNamespace(_fixtures.natsConn, time.Millisecond*1000, "default", log) - _, err = nodeClient.StartWorkload(deployRequest) - Expect(err).To(BeNil()) + nodeClient := controlapi.NewApiClientWithNamespace(_fixtures.natsConn, time.Millisecond*1000, "default", log) + _, err = nodeClient.StartWorkload(deployRequest) + Expect(err).To(BeNil()) - time.Sleep(time.Millisecond * 1000) - }) + time.Sleep(time.Millisecond * 1000) + }) - Describe("triggering the deployed function", func() { - var respmsg *nats.Msg + Describe("triggering the deployed function", func() { + var respmsg *nats.Msg + + JustBeforeEach(func() { + respmsg, err = _fixtures.natsConn.Request(triggerSubject, []byte("hello!"), time.Millisecond*15000) + Expect(err).To(BeNil()) + }) + + It("should respond to the request with the list of keys and value of hello2", func(ctx SpecContext) { + Expect(respmsg).NotTo(BeNil()) + + type hostServicesExampleResp struct { + Keys []string `json:"keys"` + Hello2 string `json:"hello2"` + } + + var resp *hostServicesExampleResp + err = json.Unmarshal(respmsg.Data, &resp) + Expect(err).To(BeNil()) + + Expect(resp).ToNot(BeNil()) + + Expect(len(resp.Keys)).To(Equal(1)) + Expect(resp.Keys[0]).To(Equal("hello2")) + Expect(resp.Hello2).To(Equal("hello!")) + }) + }) + }) + }) + Context("messaging service", func() { + Context("when the javascript is valid", func() { JustBeforeEach(func() { - respmsg, err = _fixtures.natsConn.Request(triggerSubject, []byte("hello!"), time.Millisecond*15000) + triggerSubject = "hellomessagingservice" + deployRequest, err = newDeployRequest(*nodeID, "messaginghostservice", "nex messaging service example", "../examples/v8/echofunction/src/messaging.js", map[string]string{}, []string{triggerSubject}, log) + Expect(err).To(BeNil()) + + nodeClient := controlapi.NewApiClientWithNamespace(_fixtures.natsConn, time.Millisecond*1000, "default", log) + _, err = nodeClient.StartWorkload(deployRequest) Expect(err).To(BeNil()) + + time.Sleep(time.Millisecond * 1000) }) - It("should respond to the request with the list of keys and value of hello2", func(ctx SpecContext) { - Expect(respmsg).NotTo(BeNil()) + Describe("triggering the deployed function", func() { + var respmsg *nats.Msg + var sub *nats.Subscription - type hostServicesExampleResp struct { - Keys []string `json:"keys"` - Hello2 string `json:"hello2"` - } + BeforeEach(func() { + sub, _ = _fixtures.natsConn.Subscribe("hello.world.request", func(msg *nats.Msg) { + resp := fmt.Sprintf("resp: %s", string(msg.Data)) + _ = msg.Respond([]byte(resp)) + }) - var resp *hostServicesExampleResp - err = json.Unmarshal(respmsg.Data, &resp) - Expect(err).To(BeNil()) + sub, _ = _fixtures.natsConn.Subscribe("hello.world.request.many", func(msg *nats.Msg) { + resp := fmt.Sprintf("resp #1: %s", string(msg.Data)) + _ = msg.Respond([]byte(resp)) + + resp = fmt.Sprintf("resp #2: %s", string(msg.Data)) + _ = msg.Respond([]byte(resp)) + }) + }) + + AfterEach(func() { + _ = sub.Unsubscribe() + sub = nil + }) - Expect(resp).ToNot(BeNil()) + JustBeforeEach(func() { + respmsg, err = _fixtures.natsConn.Request(triggerSubject, []byte("asdfghjkl;'"), time.Millisecond*7500) + Expect(err).To(BeNil()) + }) - Expect(len(resp.Keys)).To(Equal(1)) - Expect(resp.Keys[0]).To(Equal("hello2")) - Expect(resp.Hello2).To(Equal("hello!")) + It("should respond to the request with the hello.world.request response value", func(ctx SpecContext) { + Expect(respmsg).NotTo(BeNil()) + + type hostServicesExampleResp struct { + HelloWorldRequest string `json:"hello.world.request"` + HelloWorldRequestMany []string `json:"hello.world.request.many"` + } + + var resp *hostServicesExampleResp + err = json.Unmarshal(respmsg.Data, &resp) + Expect(err).To(BeNil()) + + Expect(resp).ToNot(BeNil()) + + Expect(resp.HelloWorldRequest).ToNot(BeNil()) + Expect(resp.HelloWorldRequest).To(Equal("resp: asdfghjkl;'")) + }) + + It("should respond to the request with the hello.world.request.many response values", func(ctx SpecContext) { + Expect(respmsg).NotTo(BeNil()) + + type hostServicesExampleResp struct { + HelloWorldRequest string `json:"hello.world.request"` + HelloWorldRequestMany []string `json:"hello.world.request.many"` + } + + var resp *hostServicesExampleResp + err = json.Unmarshal(respmsg.Data, &resp) + Expect(err).To(BeNil()) + + Expect(resp).ToNot(BeNil()) + + Expect(resp.HelloWorldRequestMany).ToNot(BeNil()) + Expect(len(resp.HelloWorldRequestMany)).To(Equal(2)) + Expect(resp.HelloWorldRequestMany[0]).To(Equal("resp #1: asdfghjkl;'")) + Expect(resp.HelloWorldRequestMany[1]).To(Equal("resp #2: asdfghjkl;'")) + }) }) }) + }) }) })