diff --git a/agent/providers/lib/v8.go b/agent/providers/lib/v8.go index 1a9d9170..c705dee5 100644 --- a/agent/providers/lib/v8.go +++ b/agent/providers/lib/v8.go @@ -63,15 +63,11 @@ const ( hostServicesObjectStoreDeleteTimeout = time.Millisecond * 3000 hostServicesObjectStoreListTimeout = time.Millisecond * 3000 - nexTriggerSubject = "x-nex-trigger-subject" - nexRuntimeNs = "x-nex-runtime-ns" - - messageSubject = "x-subject" - - objectName = "x-object-name" - - v8FunctionArrayAppend = "array-append" - v8FunctionArrayInit = "array-init" + v8FunctionArrayAppend = "array-append" + v8FunctionArrayInit = "array-init" + v8FunctionUInt8ArrayInit = "uint8-array-init" + v8FunctionUInt8ArraySetIdx = "uint8-array-set-idx" + v8FunctionUInt8ArrayToString = "uint8-array-to-string" v8ExecutionTimeoutMillis = 5000 v8MaxFileSizeBytes = int64(12288) // arbitrarily ~12K, for now @@ -110,7 +106,7 @@ func (v *V8) Deploy() error { subject := fmt.Sprintf("agentint.%s.trigger", v.vmID) _, err := v.nc.Subscribe(subject, func(msg *nats.Msg) { startTime := time.Now() - val, err := v.Execute(msg.Header.Get(nexTriggerSubject), msg.Data) + val, err := v.Execute(msg.Header.Get(agentapi.NexTriggerSubject), msg.Data) if err != nil { _, _ = v.stderr.Write([]byte(fmt.Sprintf("failed to execute function on trigger subject %s: %s", subject, err.Error()))) return @@ -120,7 +116,7 @@ func (v *V8) Deploy() error { err = msg.RespondMsg(&nats.Msg{ Data: val, Header: nats.Header{ - nexRuntimeNs: []string{strconv.FormatInt(runtimeNanos, 10)}, + agentapi.NexRuntimeNs: []string{strconv.FormatInt(runtimeNanos, 10)}, }, }) if err != nil { @@ -171,8 +167,9 @@ func (v *V8) Execute(subject string, payload []byte) ([]byte, error) { return } - argv2, err := v8.NewValue(ctx.Isolate(), string(payload)) + argv2, err := v.toUInt8ArrayValue(payload) if err != nil { + _, _ = v.stdout.Write([]byte(fmt.Sprintf("failed to convert raw %d-length []byte to Uint8[]: %s", len(payload), err.Error()))) errs <- err return } @@ -188,6 +185,8 @@ func (v *V8) Execute(subject string, payload []byte) ([]byte, error) { select { case val := <-vals: + // FIXME-- switch on val type or are we ok with forcing a JSON response? + retval, err := val.MarshalJSON() if err != nil { return nil, err @@ -247,19 +246,36 @@ func (v *V8) Validate() error { return fmt.Errorf("failed to compile source for execution: %s", err) } - // FIXME-- move this somewhere cleaner + v.initUtils() + + return nil +} + +func (v *V8) initUtils() { append, _ := v.iso.CompileUnboundScript("(arr, value) => { arr.push(value); return arr; };", "array-append.js", v8.CompileOptions{}) appendval, _ := append.Run(v.ctx) appendfn, _ := appendval.AsFunction() v.utils[v8FunctionArrayAppend] = appendfn - // FIXME-- move this somewhere cleaner init, _ := v.iso.CompileUnboundScript("() => { let arr = []; return arr; };", "array-init.js", v8.CompileOptions{}) initval, _ := init.Run(v.ctx) initfn, _ := initval.AsFunction() v.utils[v8FunctionArrayInit] = initfn - return nil + inituint8, _ := v.iso.CompileUnboundScript("(len) => { let arr = new Uint8Array(Number(len)); return arr; };", "uint8-array-init.js", v8.CompileOptions{}) + inituint8val, _ := inituint8.Run(v.ctx) + inituint8fn, _ := inituint8val.AsFunction() + v.utils[v8FunctionUInt8ArrayInit] = inituint8fn + + uint8arrsetidx, _ := v.iso.CompileUnboundScript("(arr, i, value) => { arr[Number(i)] = value; return arr; };", "uint8-array-set-idx.js", v8.CompileOptions{}) + uint8arrsetidxval, _ := uint8arrsetidx.Run(v.ctx) + uint8arrsetidxfn, _ := uint8arrsetidxval.AsFunction() + v.utils[v8FunctionUInt8ArraySetIdx] = uint8arrsetidxfn + + uint8arrtostr, _ := v.iso.CompileUnboundScript("(arr) => { return String.fromCharCode(...arr); };", "uint8-array-to-string.js", v8.CompileOptions{}) + uint8arrtostrval, _ := uint8arrtostr.Run(v.ctx) + uint8arrtostrfn, _ := uint8arrtostrval.AsFunction() + v.utils[v8FunctionUInt8ArrayToString] = uint8arrtostrfn } func (v *V8) newV8Context() (*v8.Context, error) { @@ -342,12 +358,10 @@ func (v *V8) newHTTPObjectTemplate() *v8.ObjectTemplate { return v.iso.ThrowException(val) } - req, _ := json.Marshal(&agentapi.HostServicesHTTPRequest{ - Method: hostServicesHTTPGetFunctionName, - URL: _url.String(), - }) + msg := nats.NewMsg(v.httpServiceSubject(hostServicesHTTPGetFunctionName)) + msg.Header.Add(agentapi.HttpURLHeader, _url.String()) - resp, err := v.nc.Request(v.httpServiceSubject(hostServicesHTTPGetFunctionName), req, hostServicesHTTPRequestTimeout) + resp, err := v.nc.RequestMsg(msg, hostServicesHTTPRequestTimeout) if err != nil { val, _ := v8.NewValue(v.iso, err.Error()) return v.iso.ThrowException(val) @@ -420,24 +434,17 @@ func (v *V8) newHTTPObjectTemplate() *v8.ObjectTemplate { return v.iso.ThrowException(val) } - var data string - if len(args) > 1 { - payload := args[1] - if payload.IsObject() || payload.IsArray() { - payloadJSON, _ := payload.MarshalJSON() - data = string(payloadJSON) - } else { - data = payload.String() - } + payload, err := v.marshalValue(args[1]) + if err != nil { + val, _ := v8.NewValue(v.iso, err.Error()) + return v.iso.ThrowException(val) } - req, _ := json.Marshal(&agentapi.HostServicesHTTPRequest{ - Method: hostServicesHTTPPostFunctionName, - URL: _url.String(), - Body: &data, - }) + msg := nats.NewMsg(v.httpServiceSubject(hostServicesHTTPPostFunctionName)) + msg.Header.Add(agentapi.HttpURLHeader, _url.String()) + msg.Data = []byte(payload) - resp, err := v.nc.Request(v.httpServiceSubject(hostServicesHTTPPostFunctionName), req, hostServicesHTTPRequestTimeout) + resp, err := v.nc.RequestMsg(msg, hostServicesHTTPRequestTimeout) if err != nil { val, _ := v8.NewValue(v.iso, err.Error()) return v.iso.ThrowException(val) @@ -510,24 +517,17 @@ func (v *V8) newHTTPObjectTemplate() *v8.ObjectTemplate { return v.iso.ThrowException(val) } - var data string - if len(args) > 1 { - payload := args[1] - if payload.IsObject() || payload.IsArray() { - payloadJSON, _ := payload.MarshalJSON() - data = string(payloadJSON) - } else { - data = payload.String() - } + payload, err := v.marshalValue(args[1]) + if err != nil { + val, _ := v8.NewValue(v.iso, err.Error()) + return v.iso.ThrowException(val) } - req, _ := json.Marshal(&agentapi.HostServicesHTTPRequest{ - Method: hostServicesHTTPPutFunctionName, - URL: _url.String(), - Body: &data, - }) + msg := nats.NewMsg(v.httpServiceSubject(hostServicesHTTPPutFunctionName)) + msg.Header.Add(agentapi.HttpURLHeader, _url.String()) + msg.Data = []byte(payload) - resp, err := v.nc.Request(v.httpServiceSubject(hostServicesHTTPPutFunctionName), req, hostServicesHTTPRequestTimeout) + resp, err := v.nc.RequestMsg(msg, hostServicesHTTPRequestTimeout) if err != nil { val, _ := v8.NewValue(v.iso, err.Error()) return v.iso.ThrowException(val) @@ -600,24 +600,17 @@ func (v *V8) newHTTPObjectTemplate() *v8.ObjectTemplate { return v.iso.ThrowException(val) } - var data string - if len(args) > 1 { - payload := args[1] - if payload.IsObject() || payload.IsArray() { - payloadJSON, _ := payload.MarshalJSON() - data = string(payloadJSON) - } else { - data = payload.String() - } + payload, err := v.marshalValue(args[1]) + if err != nil { + val, _ := v8.NewValue(v.iso, err.Error()) + return v.iso.ThrowException(val) } - req, _ := json.Marshal(&agentapi.HostServicesHTTPRequest{ - Method: hostServicesHTTPPatchFunctionName, - URL: _url.String(), - Body: &data, - }) + msg := nats.NewMsg(v.httpServiceSubject(hostServicesHTTPPatchFunctionName)) + msg.Header.Add(agentapi.HttpURLHeader, _url.String()) + msg.Data = []byte(payload) - resp, err := v.nc.Request(v.httpServiceSubject(hostServicesHTTPPatchFunctionName), req, hostServicesHTTPRequestTimeout) + resp, err := v.nc.RequestMsg(msg, hostServicesHTTPRequestTimeout) if err != nil { val, _ := v8.NewValue(v.iso, err.Error()) return v.iso.ThrowException(val) @@ -690,24 +683,10 @@ func (v *V8) newHTTPObjectTemplate() *v8.ObjectTemplate { return v.iso.ThrowException(val) } - var data string - if len(args) > 1 { - payload := args[1] - if payload.IsObject() || payload.IsArray() { - payloadJSON, _ := payload.MarshalJSON() - data = string(payloadJSON) - } else { - data = payload.String() - } - } + msg := nats.NewMsg(v.httpServiceSubject(hostServicesHTTPDeleteFunctionName)) + msg.Header.Add(agentapi.HttpURLHeader, _url.String()) - req, _ := json.Marshal(&agentapi.HostServicesHTTPRequest{ - Method: hostServicesHTTPDeleteFunctionName, - URL: _url.String(), - Body: &data, // this should not be present for DELETE requests, but it is not explicitly forbidden. so we'll allow it - }) - - resp, err := v.nc.Request(v.httpServiceSubject(hostServicesHTTPDeleteFunctionName), req, hostServicesHTTPRequestTimeout) + resp, err := v.nc.RequestMsg(msg, hostServicesHTTPRequestTimeout) if err != nil { val, _ := v8.NewValue(v.iso, err.Error()) return v.iso.ThrowException(val) @@ -780,12 +759,10 @@ func (v *V8) newHTTPObjectTemplate() *v8.ObjectTemplate { return v.iso.ThrowException(val) } - req, _ := json.Marshal(&agentapi.HostServicesHTTPRequest{ - Method: hostServicesHTTPHeadFunctionName, - URL: _url.String(), - }) + msg := nats.NewMsg(v.httpServiceSubject(hostServicesHTTPHeadFunctionName)) + msg.Header.Add(agentapi.HttpURLHeader, _url.String()) - resp, err := v.nc.Request(v.httpServiceSubject(hostServicesHTTPHeadFunctionName), req, hostServicesHTTPRequestTimeout) + resp, err := v.nc.RequestMsg(msg, hostServicesHTTPRequestTimeout) if err != nil { val, _ := v8.NewValue(v.iso, err.Error()) return v.iso.ThrowException(val) @@ -852,25 +829,18 @@ func (v *V8) newKeyValueObjectTemplate() *v8.ObjectTemplate { key := args[0].String() - req, _ := json.Marshal(&agentapi.HostServicesKeyValueRequest{ - Key: &key, - }) - - resp, err := v.nc.Request(v.keyValueServiceSubject(hostServicesKVGetFunctionName), req, hostServicesKVGetTimeout) - if err != nil { - val, _ := v8.NewValue(v.iso, err.Error()) - return v.iso.ThrowException(val) - } + msg := nats.NewMsg(v.keyValueServiceSubject(hostServicesKVGetFunctionName)) + msg.Header.Add(agentapi.KeyValueKeyHeader, key) - var kvresp *agentapi.HostServicesKeyValueRequest - err = json.Unmarshal(resp.Data, &kvresp) + resp, err := v.nc.RequestMsg(msg, hostServicesKVGetTimeout) if err != nil { val, _ := v8.NewValue(v.iso, err.Error()) return v.iso.ThrowException(val) } - val, err := v8.JSONParse(v.ctx, string(*kvresp.Value)) + val, err := v.toUInt8ArrayValue(resp.Data) if err != nil { + _, _ = v.stdout.Write([]byte(fmt.Sprintf("failed to convert raw %d-length []byte to Uint8[]: %s", len(resp.Data), err.Error()))) val, _ := v8.NewValue(v.iso, err.Error()) return v.iso.ThrowException(val) } @@ -886,27 +856,24 @@ func (v *V8) newKeyValueObjectTemplate() *v8.ObjectTemplate { } key := args[0].String() - value := args[1] - raw, err := value.MarshalJSON() // FIXME-- support Uint8 array + value, err := v.marshalValue(args[1]) if err != nil { val, _ := v8.NewValue(v.iso, err.Error()) return v.iso.ThrowException(val) } - val := json.RawMessage(raw) - req, _ := json.Marshal(&agentapi.HostServicesKeyValueRequest{ - Key: &key, - Value: &val, - }) + msg := nats.NewMsg(v.keyValueServiceSubject(hostServicesKVSetFunctionName)) + msg.Header.Add(agentapi.KeyValueKeyHeader, key) + msg.Data = value - resp, err := v.nc.Request(v.keyValueServiceSubject(hostServicesKVSetFunctionName), req, hostServicesKVSetTimeout) + resp, err := v.nc.RequestMsg(msg, hostServicesKVSetTimeout) if err != nil { val, _ := v8.NewValue(v.iso, err.Error()) return v.iso.ThrowException(val) } - var kvresp *agentapi.HostServicesKeyValueRequest + var kvresp *agentapi.HostServicesKeyValueResponse err = json.Unmarshal(resp.Data, &kvresp) if err != nil { val, _ := v8.NewValue(v.iso, err.Error()) @@ -914,7 +881,7 @@ func (v *V8) newKeyValueObjectTemplate() *v8.ObjectTemplate { } if !*kvresp.Success { - val, _ := v8.NewValue(v.iso, fmt.Sprintf("failed to set %d-byte value for key: %s", len(val), key)) + val, _ := v8.NewValue(v.iso, fmt.Sprintf("failed to set %d-byte value for key: %s", len(value), key)) return v.iso.ThrowException(val) } @@ -930,17 +897,16 @@ func (v *V8) newKeyValueObjectTemplate() *v8.ObjectTemplate { key := args[0].String() - req, _ := json.Marshal(&agentapi.HostServicesKeyValueRequest{ - Key: &key, - }) + msg := nats.NewMsg(v.keyValueServiceSubject(hostServicesKVDeleteFunctionName)) + msg.Header.Add(agentapi.KeyValueKeyHeader, key) - resp, err := v.nc.Request(v.keyValueServiceSubject(hostServicesKVDeleteFunctionName), req, hostServicesKVDeleteTimeout) + resp, err := v.nc.RequestMsg(msg, hostServicesKVDeleteTimeout) if err != nil { val, _ := v8.NewValue(v.iso, err.Error()) return v.iso.ThrowException(val) } - var kvresp *agentapi.HostServicesKeyValueRequest + var kvresp *agentapi.HostServicesKeyValueResponse err = json.Unmarshal(resp.Data, &kvresp) if err != nil { val, _ := v8.NewValue(v.iso, err.Error()) @@ -987,10 +953,14 @@ func (v *V8) newMessagingObjectTemplate() *v8.ObjectTemplate { } subject := args[0].String() - payload := args[1].String() + payload, err := v.marshalValue(args[1]) + if err != nil { + val, _ := v8.NewValue(v.iso, err.Error()) + return v.iso.ThrowException(val) + } msg := nats.NewMsg(v.messagingServiceSubject(hostServicesMessagingPublishFunctionName)) - msg.Header.Add(messageSubject, subject) + msg.Header.Add(agentapi.MessagingSubjectHeader, subject) msg.Data = []byte(payload) resp, err := v.nc.RequestMsg(msg, hostServicesMessagingPublishTimeout) @@ -1017,10 +987,14 @@ func (v *V8) newMessagingObjectTemplate() *v8.ObjectTemplate { } subject := args[0].String() - payload := args[1].String() + payload, err := v.marshalValue(args[1]) + if err != nil { + val, _ := v8.NewValue(v.iso, err.Error()) + return v.iso.ThrowException(val) + } msg := nats.NewMsg(v.messagingServiceSubject(hostServicesMessagingRequestFunctionName)) - msg.Header.Add(messageSubject, subject) + msg.Header.Add(agentapi.MessagingSubjectHeader, subject) msg.Data = []byte(payload) resp, err := v.nc.RequestMsg(msg, hostServicesMessagingRequestTimeout) @@ -1029,15 +1003,9 @@ func (v *V8) newMessagingObjectTemplate() *v8.ObjectTemplate { return v.iso.ThrowException(val) } - var msgresp *agentapi.HostServicesMessagingResponse - err = json.Unmarshal(resp.Data, &msgresp) - if err == nil && len(msgresp.Errors) > 0 { - val, _ := v8.NewValue(v.iso, msgresp.Errors[0]) - return v.iso.ThrowException(val) - } - - val, err := v8.NewValue(v.iso, string(resp.Data)) // FIXME-- pass []byte natively into javascript using ArrayBuffer + val, err := v.toUInt8ArrayValue(resp.Data) if err != nil { + _, _ = v.stdout.Write([]byte(fmt.Sprintf("failed to convert raw %d-length []byte to Uint8[]: %s", len(resp.Data), err.Error()))) val, _ := v8.NewValue(v.iso, err.Error()) return v.iso.ThrowException(val) } @@ -1053,11 +1021,16 @@ func (v *V8) newMessagingObjectTemplate() *v8.ObjectTemplate { } subject := args[0].String() - payload := args[1].String() + + payload, err := v.marshalValue(args[1]) + if err != nil { + val, _ := v8.NewValue(v.iso, err.Error()) + return v.iso.ThrowException(val) + } // construct the requestMany request message msg := nats.NewMsg(v.messagingServiceSubject(hostServicesMessagingRequestManyFunctionName)) - msg.Header.Add(messageSubject, subject) + msg.Header.Add(agentapi.MessagingSubjectHeader, subject) msg.Reply = v.nc.NewRespInbox() msg.Data = []byte(payload) @@ -1102,8 +1075,9 @@ func (v *V8) newMessagingObjectTemplate() *v8.ObjectTemplate { if resp != nil { _, _ = v.stdout.Write([]byte(fmt.Sprintf("received %d-byte response", len(resp.Data)))) - respval, err := v8.NewValue(v.iso, string(resp.Data)) // FIXME-- pass []byte natively into javascript using ArrayBuffer + respval, err := v.toUInt8ArrayValue(resp.Data) if err != nil { + _, _ = v.stdout.Write([]byte(fmt.Sprintf("failed to convert raw %d-length []byte to Uint8[]: %s", len(resp.Data), err.Error()))) val, _ := v8.NewValue(v.iso, err.Error()) return v.iso.ThrowException(val) } @@ -1135,7 +1109,7 @@ func (v *V8) newObjectStoreObjectTemplate() *v8.ObjectTemplate { name := args[0].String() msg := nats.NewMsg(v.objectStoreServiceSubject(hostServicesObjectStoreGetFunctionName)) - msg.Header.Add(objectName, name) + msg.Header.Add(agentapi.ObjectStoreObjectNameHeader, name) resp, err := v.nc.RequestMsg(msg, hostServicesObjectStoreGetTimeout) if err != nil { @@ -1143,8 +1117,9 @@ func (v *V8) newObjectStoreObjectTemplate() *v8.ObjectTemplate { return v.iso.ThrowException(val) } - val, err := v8.NewValue(v.iso, string(resp.Data)) // FIXME-- pass []byte natively into javascript using ArrayBuffer + val, err := v.toUInt8ArrayValue(resp.Data) if err != nil { + _, _ = v.stdout.Write([]byte(fmt.Sprintf("failed to convert raw %d-length []byte to Uint8[]: %s", len(resp.Data), err.Error()))) val, _ := v8.NewValue(v.iso, err.Error()) return v.iso.ThrowException(val) } @@ -1160,25 +1135,15 @@ func (v *V8) newObjectStoreObjectTemplate() *v8.ObjectTemplate { } name := args[0].String() - value := args[1].String() - - // FIXME- resolve primitive value from the following types: - // string -> V8::String - // int32 -> V8::Integer - // uint32 -> V8::Integer - // int64 -> V8::BigInt - // uint64 -> V8::BigInt - // bool -> V8::Boolean - // *big.Int -> V8::BigInt - - // raw, err := value.MarshalJSON() // FIXME-- support Uint8 array - // if err != nil { - // val, _ := v8.NewValue(v.iso, err.Error()) - // return v.iso.ThrowException(val) - // } + + value, err := v.marshalValue(args[1]) + if err != nil { + val, _ := v8.NewValue(v.iso, err.Error()) + return v.iso.ThrowException(val) + } msg := nats.NewMsg(v.objectStoreServiceSubject(hostServicesObjectStorePutFunctionName)) - msg.Header.Add(objectName, name) + msg.Header.Add(agentapi.ObjectStoreObjectNameHeader, name) msg.Data = []byte(value) resp, err := v.nc.RequestMsg(msg, hostServicesObjectStorePutTimeout) @@ -1187,7 +1152,7 @@ func (v *V8) newObjectStoreObjectTemplate() *v8.ObjectTemplate { return v.iso.ThrowException(val) } - val, err := v8.JSONParse(v.ctx, string(resp.Data)) + val, err := v8.JSONParse(v.ctx, string(resp.Data)) // nats.ObjectMeta JSON if err != nil { val, _ := v8.NewValue(v.iso, err.Error()) return v.iso.ThrowException(val) @@ -1206,7 +1171,7 @@ func (v *V8) newObjectStoreObjectTemplate() *v8.ObjectTemplate { name := args[0].String() msg := nats.NewMsg(v.objectStoreServiceSubject(hostServicesObjectStoreDeleteFunctionName)) - msg.Header.Add(objectName, name) + msg.Header.Add(agentapi.ObjectStoreObjectNameHeader, name) resp, err := v.nc.RequestMsg(msg, hostServicesObjectStoreDeleteTimeout) if err != nil { @@ -1250,6 +1215,60 @@ func (v *V8) newObjectStoreObjectTemplate() *v8.ObjectTemplate { return objectStore } +// marshal the given v8 value to an array of bytes that can be sent over the wire +func (v *V8) marshalValue(val *v8.Value) ([]byte, error) { + if val.IsUint8Array() { + v, err := v.utils[v8FunctionUInt8ArrayToString].Call(v.ctx.Global(), val) + if err != nil { + return nil, err + } + + return []byte(v.String()), nil + } + + return nil, fmt.Errorf("failed to marshal v8 value to []byte: %v; only Uint8[] is supported", val) +} + +// unmarshal the given []byte value into a native Uint8Array which can be handed back into v8 +func (v *V8) toUInt8ArrayValue(val []byte) (*v8.Value, error) { + // initialize a v8 value representing the size in bytes of the native Uint8Array to be allocated + len, err := v8.NewValue(v.iso, uint64(len(val))) + if err != nil { + return nil, err + } + + // initialize a native Uint8Array + nativeUint8Arr, err := v.utils[v8FunctionUInt8ArrayInit].Call(v.ctx.Global(), len) + if err != nil { + return nil, err + } + + for i, _uint := range val { + // initialize a v8 value representing the current byte offset in our native Uint8Array + _i, err := v8.NewValue(v.iso, uint64(i)) + if err != nil { + _, _ = v.stdout.Write([]byte(fmt.Sprintf("failed to cast i index value to uint32: %s", err.Error()))) + return nil, err + } + + // pack 8 bits into a uint32, as this is needed when initializing a v8.Value + _val, err := v8.NewValue(v.iso, uint32(_uint)) + if err != nil { + _, _ = v.stdout.Write([]byte(fmt.Sprintf("failed to cast byte to uint32: %s", err.Error()))) + return nil, err + } + + // write 8 bits to the current byte offset in the native Uint8Array + nativeUint8Arr, err = v.utils[v8FunctionUInt8ArraySetIdx].Call(v.ctx.Global(), nativeUint8Arr, _i, _val) + if err != nil { + _, _ = v.stdout.Write([]byte(fmt.Sprintf("failed to call %s: %s", v8FunctionUInt8ArraySetIdx, err.Error()))) + return nil, err + } + } + + return nativeUint8Arr, nil +} + // convenience method to initialize a V8 execution provider func InitNexExecutionProviderV8(params *agentapi.ExecutionProviderParams) (*V8, error) { if params.WorkloadName == nil { @@ -1260,10 +1279,6 @@ func InitNexExecutionProviderV8(params *agentapi.ExecutionProviderParams) (*V8, return nil, errors.New("V8 execution provider requires a temporary filename parameter") } - // if params.TotalBytes == nil { - // return nil, errors.New("V8 execution provider requires a total bytes parameter") - // } - return &V8{ environment: params.Environment, name: *params.WorkloadName, diff --git a/examples/v8/echofunction/src/echofunction.js b/examples/v8/echofunction/src/echofunction.js index 8127eaa8..e02fa5d9 100644 --- a/examples/v8/echofunction/src/echofunction.js +++ b/examples/v8/echofunction/src/echofunction.js @@ -2,6 +2,6 @@ console.log(subject); return { triggered_on: subject, - payload: payload + payload: String.fromCharCode(...payload) } }; diff --git a/examples/v8/echofunction/src/kv.js b/examples/v8/echofunction/src/kv.js index 1b45fa1b..21f89338 100644 --- a/examples/v8/echofunction/src/kv.js +++ b/examples/v8/echofunction/src/kv.js @@ -5,6 +5,6 @@ this.hostServices.kv.set('hello2', payload); return { keys: this.hostServices.kv.keys(), - hello2: this.hostServices.kv.get('hello2') + hello2: String.fromCharCode(...this.hostServices.kv.get('hello2')) } }; diff --git a/examples/v8/echofunction/src/messaging.js b/examples/v8/echofunction/src/messaging.js index 3d04f373..66f8a1c5 100644 --- a/examples/v8/echofunction/src/messaging.js +++ b/examples/v8/echofunction/src/messaging.js @@ -1,28 +1,38 @@ (subject, payload) => { this.hostServices.messaging.publish('hello.world', payload); - var req; + var reqResp; var reqEx; - var reqMany; + var reqManyResp; var reqManyEx; try { - req = this.hostServices.messaging.request('hello.world.request', payload); + reqResp = this.hostServices.messaging.request('hello.world.request', payload); + reqResp = String.fromCharCode(...reqResp) } catch (e) { reqEx = e; } try { - reqMany = this.hostServices.messaging.requestMany('hello.world.request.many', payload); + reqManyResp = [] + + let responses = this.hostServices.messaging.requestMany('hello.world.request.many', payload) + + // responses is an array of Uint8Array... flatten it so we can use each value + responses = Array.prototype.slice.call(responses) + + for (let i = 0; i < responses.length; i++) { + reqManyResp.push(String.fromCharCode(...responses[i])) + } } catch (e) { reqManyEx = e; } return { - 'hello.world.request': req, + 'hello.world.request': reqResp, 'hello.world.request.ex': reqEx, - 'hello.world.request.many': reqMany, + 'hello.world.request.many': reqManyResp, 'hello.world.request.many.ex': reqManyEx } }; diff --git a/examples/v8/echofunction/src/objectstore.js b/examples/v8/echofunction/src/objectstore.js index dc1ec040..a07d84f3 100644 --- a/examples/v8/echofunction/src/objectstore.js +++ b/examples/v8/echofunction/src/objectstore.js @@ -5,6 +5,6 @@ this.hostServices.objectStore.put('hello2', payload); return { list: this.hostServices.objectStore.list(), - hello2: this.hostServices.objectStore.get('hello2') + hello2: String.fromCharCode(...this.hostServices.objectStore.get('hello2')) } }; diff --git a/internal/agent-api/client.go b/internal/agent-api/client.go index 9761f52a..120b9737 100644 --- a/internal/agent-api/client.go +++ b/internal/agent-api/client.go @@ -23,7 +23,16 @@ type EventCallback func(string, cloudevents.Event) type LogCallback func(string, LogEntry) const ( - nexTriggerSubject = "x-nex-trigger-subject" + NexTriggerSubject = "x-nex-trigger-subject" + NexRuntimeNs = "x-nex-runtime-ns" + + HttpURLHeader = "x-http-url" + + KeyValueKeyHeader = "x-keyvalue-key" + + MessagingSubjectHeader = "x-subject" + + ObjectStoreObjectNameHeader = "x-object-name" ) type AgentClient struct { @@ -182,7 +191,7 @@ func (a *AgentClient) UptimeMillis() time.Duration { func (a *AgentClient) RunTrigger(ctx context.Context, tracer trace.Tracer, subject string, data []byte) (*nats.Msg, error) { intmsg := nats.NewMsg(fmt.Sprintf("agentint.%s.trigger", a.agentID)) // TODO: inject tracer context into message header - intmsg.Header.Add(nexTriggerSubject, subject) + intmsg.Header.Add(NexTriggerSubject, subject) intmsg.Data = data cctx, childSpan := tracer.Start( diff --git a/internal/agent-api/types.go b/internal/agent-api/types.go index 462bbeeb..e1ed3819 100644 --- a/internal/agent-api/types.go +++ b/internal/agent-api/types.go @@ -169,12 +169,11 @@ type HostServicesHTTPResponse struct { Error *string `json:"error,omitempty"` } -type HostServicesKeyValueRequest struct { - Key *string `json:"key"` - Value *json.RawMessage `json:"value,omitempty"` - +type HostServicesKeyValueResponse struct { Revision int64 `json:"revision,omitempty"` Success *bool `json:"success,omitempty"` + + Errors []string `json:"errors,omitempty"` } type HostServicesObjectStoreResponse struct { diff --git a/internal/node/services/lib/http.go b/internal/node/services/lib/http.go index b32009ed..85fdb73c 100644 --- a/internal/node/services/lib/http.go +++ b/internal/node/services/lib/http.go @@ -74,23 +74,7 @@ func (h *HTTPService) HandleRPC(msg *nats.Msg) { } func (h *HTTPService) handleGet(msg *nats.Msg) { - var req *agentapi.HostServicesHTTPRequest - err := json.Unmarshal(msg.Data, &req) - if err != nil { - h.log.Warn(fmt.Sprintf("failed to unmarshal http RPC request: %s", err.Error())) - - resp, _ := json.Marshal(map[string]interface{}{ - "error": fmt.Sprintf("failed to unmarshal http RPC request: %s", err.Error()), - }) - - err := msg.Respond(resp) - if err != nil { - h.log.Error(fmt.Sprintf("failed to respond to host services RPC request: %s", err.Error())) - } - return - } - - url, err := url.Parse(req.URL) + url, err := url.Parse(msg.Header.Get(agentapi.HttpURLHeader)) if err != nil { h.log.Debug("failed to parse url for http RPC request", slog.String("error", err.Error())) @@ -109,25 +93,7 @@ func (h *HTTPService) handleGet(msg *nats.Msg) { WithLogger(h.log). WithTimeoutMillis(defaultHTTPRequestTimeoutMillis) - params := map[string]interface{}{} - if req.Params != nil { - err = json.Unmarshal(*req.Params, ¶ms) - if err != nil { - h.log.Debug("failed to unmarshal query params for http RPC request", slog.String("error", err.Error())) - - resp, _ := json.Marshal(map[string]interface{}{ - "error": fmt.Sprintf("failed to unmarshal query params for http RPC request: %s", err.Error()), - }) - - err := msg.Respond(resp) - if err != nil { - h.log.Error(fmt.Sprintf("failed to respond to host services RPC request: %s", err.Error())) - } - return - } - } - - status, resphdrs, httpresp, err := client.Get(url.Path, params) + status, resphdrs, httpresp, err := client.Get(url.Path, url.Query()) if err != nil { resp, _ := json.Marshal(map[string]interface{}{ "error": fmt.Sprintf("http reqeust failed: %s", err.Error()), @@ -155,23 +121,7 @@ func (h *HTTPService) handleGet(msg *nats.Msg) { } func (h *HTTPService) handlePost(msg *nats.Msg) { - var req *agentapi.HostServicesHTTPRequest - err := json.Unmarshal(msg.Data, &req) - if err != nil { - h.log.Warn(fmt.Sprintf("failed to unmarshal http RPC request: %s", err.Error())) - - resp, _ := json.Marshal(map[string]interface{}{ - "error": fmt.Sprintf("failed to unmarshal http RPC request: %s", err.Error()), - }) - - err := msg.Respond(resp) - if err != nil { - h.log.Error(fmt.Sprintf("failed to respond to host services RPC request: %s", err.Error())) - } - return - } - - url, err := url.Parse(req.URL) + url, err := url.Parse(msg.Header.Get(agentapi.HttpURLHeader)) if err != nil { h.log.Debug("failed to parse url for http RPC request", slog.String("error", err.Error())) @@ -190,25 +140,7 @@ func (h *HTTPService) handlePost(msg *nats.Msg) { WithLogger(h.log). WithTimeoutMillis(defaultHTTPRequestTimeoutMillis) - params := map[string]interface{}{} - if req.Params != nil { - err = json.Unmarshal(*req.Params, ¶ms) - if err != nil { - h.log.Debug("failed to unmarshal query params for http RPC request", slog.String("error", err.Error())) - - resp, _ := json.Marshal(map[string]interface{}{ - "error": fmt.Sprintf("failed to unmarshal query params for http RPC request: %s", err.Error()), - }) - - err := msg.Respond(resp) - if err != nil { - h.log.Error(fmt.Sprintf("failed to respond to host services RPC request: %s", err.Error())) - } - return - } - } - - status, resphdrs, httpresp, err := client.Post(url.Path, req.Body) + status, resphdrs, httpresp, err := client.Post(url.Path, msg.Data) if err != nil { resp, _ := json.Marshal(map[string]interface{}{ "error": fmt.Sprintf("http reqeust failed: %s", err.Error()), @@ -236,23 +168,7 @@ func (h *HTTPService) handlePost(msg *nats.Msg) { } func (h *HTTPService) handlePut(msg *nats.Msg) { - var req *agentapi.HostServicesHTTPRequest - err := json.Unmarshal(msg.Data, &req) - if err != nil { - h.log.Warn(fmt.Sprintf("failed to unmarshal http RPC request: %s", err.Error())) - - resp, _ := json.Marshal(map[string]interface{}{ - "error": fmt.Sprintf("failed to unmarshal http RPC request: %s", err.Error()), - }) - - err := msg.Respond(resp) - if err != nil { - h.log.Error(fmt.Sprintf("failed to respond to host services RPC request: %s", err.Error())) - } - return - } - - url, err := url.Parse(req.URL) + url, err := url.Parse(msg.Header.Get(agentapi.HttpURLHeader)) if err != nil { h.log.Debug("failed to parse url for http RPC request", slog.String("error", err.Error())) @@ -271,25 +187,7 @@ func (h *HTTPService) handlePut(msg *nats.Msg) { WithLogger(h.log). WithTimeoutMillis(defaultHTTPRequestTimeoutMillis) - params := map[string]interface{}{} - if req.Params != nil { - err = json.Unmarshal(*req.Params, ¶ms) - if err != nil { - h.log.Debug("failed to unmarshal query params for http RPC request", slog.String("error", err.Error())) - - resp, _ := json.Marshal(map[string]interface{}{ - "error": fmt.Sprintf("failed to unmarshal query params for http RPC request: %s", err.Error()), - }) - - err := msg.Respond(resp) - if err != nil { - h.log.Error(fmt.Sprintf("failed to respond to host services RPC request: %s", err.Error())) - } - return - } - } - - status, resphdrs, httpresp, err := client.Put(url.Path, req.Body) + status, resphdrs, httpresp, err := client.Put(url.Path, msg.Data) if err != nil { resp, _ := json.Marshal(map[string]interface{}{ "error": fmt.Sprintf("http reqeust failed: %s", err.Error()), @@ -317,23 +215,7 @@ func (h *HTTPService) handlePut(msg *nats.Msg) { } func (h *HTTPService) handlePatch(msg *nats.Msg) { - var req *agentapi.HostServicesHTTPRequest - err := json.Unmarshal(msg.Data, &req) - if err != nil { - h.log.Warn(fmt.Sprintf("failed to unmarshal http RPC request: %s", err.Error())) - - resp, _ := json.Marshal(map[string]interface{}{ - "error": fmt.Sprintf("failed to unmarshal http RPC request: %s", err.Error()), - }) - - err := msg.Respond(resp) - if err != nil { - h.log.Error(fmt.Sprintf("failed to respond to host services RPC request: %s", err.Error())) - } - return - } - - url, err := url.Parse(req.URL) + url, err := url.Parse(msg.Header.Get(agentapi.HttpURLHeader)) if err != nil { h.log.Debug("failed to parse url for http RPC request", slog.String("error", err.Error())) @@ -352,25 +234,7 @@ func (h *HTTPService) handlePatch(msg *nats.Msg) { WithLogger(h.log). WithTimeoutMillis(defaultHTTPRequestTimeoutMillis) - params := map[string]interface{}{} - if req.Params != nil { - err = json.Unmarshal(*req.Params, ¶ms) - if err != nil { - h.log.Debug("failed to unmarshal query params for http RPC request", slog.String("error", err.Error())) - - resp, _ := json.Marshal(map[string]interface{}{ - "error": fmt.Sprintf("failed to unmarshal query params for http RPC request: %s", err.Error()), - }) - - err := msg.Respond(resp) - if err != nil { - h.log.Error(fmt.Sprintf("failed to respond to host services RPC request: %s", err.Error())) - } - return - } - } - - status, resphdrs, httpresp, err := client.Patch(url.Path, req.Body) + status, resphdrs, httpresp, err := client.Patch(url.Path, msg.Data) if err != nil { resp, _ := json.Marshal(map[string]interface{}{ "error": fmt.Sprintf("http reqeust failed: %s", err.Error()), @@ -398,23 +262,7 @@ func (h *HTTPService) handlePatch(msg *nats.Msg) { } func (h *HTTPService) handleDelete(msg *nats.Msg) { - var req *agentapi.HostServicesHTTPRequest - err := json.Unmarshal(msg.Data, &req) - if err != nil { - h.log.Warn(fmt.Sprintf("failed to unmarshal http RPC request: %s", err.Error())) - - resp, _ := json.Marshal(map[string]interface{}{ - "error": fmt.Sprintf("failed to unmarshal http RPC request: %s", err.Error()), - }) - - err := msg.Respond(resp) - if err != nil { - h.log.Error(fmt.Sprintf("failed to respond to host services RPC request: %s", err.Error())) - } - return - } - - url, err := url.Parse(req.URL) + url, err := url.Parse(msg.Header.Get(agentapi.HttpURLHeader)) if err != nil { h.log.Debug("failed to parse url for http RPC request", slog.String("error", err.Error())) @@ -433,24 +281,6 @@ func (h *HTTPService) handleDelete(msg *nats.Msg) { WithLogger(h.log). WithTimeoutMillis(defaultHTTPRequestTimeoutMillis) - params := map[string]interface{}{} - if req.Params != nil { - err = json.Unmarshal(*req.Params, ¶ms) - if err != nil { - h.log.Debug("failed to unmarshal query params for http RPC request", slog.String("error", err.Error())) - - resp, _ := json.Marshal(map[string]interface{}{ - "error": fmt.Sprintf("failed to unmarshal query params for http RPC request: %s", err.Error()), - }) - - err := msg.Respond(resp) - if err != nil { - h.log.Error(fmt.Sprintf("failed to respond to host services RPC request: %s", err.Error())) - } - return - } - } - status, resphdrs, httpresp, err := client.Delete(url.Path) if err != nil { resp, _ := json.Marshal(map[string]interface{}{ @@ -479,23 +309,7 @@ func (h *HTTPService) handleDelete(msg *nats.Msg) { } func (h *HTTPService) handleHead(msg *nats.Msg) { - var req *agentapi.HostServicesHTTPRequest - err := json.Unmarshal(msg.Data, &req) - if err != nil { - h.log.Warn(fmt.Sprintf("failed to unmarshal http RPC request: %s", err.Error())) - - resp, _ := json.Marshal(map[string]interface{}{ - "error": fmt.Sprintf("failed to unmarshal http RPC request: %s", err.Error()), - }) - - err := msg.Respond(resp) - if err != nil { - h.log.Error(fmt.Sprintf("failed to respond to host services RPC request: %s", err.Error())) - } - return - } - - url, err := url.Parse(req.URL) + url, err := url.Parse(msg.Header.Get(agentapi.HttpURLHeader)) if err != nil { h.log.Debug("failed to parse url for http RPC request", slog.String("error", err.Error())) @@ -514,25 +328,7 @@ func (h *HTTPService) handleHead(msg *nats.Msg) { WithLogger(h.log). WithTimeoutMillis(defaultHTTPRequestTimeoutMillis) - params := map[string]interface{}{} - if req.Params != nil { - err = json.Unmarshal(*req.Params, ¶ms) - if err != nil { - h.log.Debug("failed to unmarshal query params for http RPC request", slog.String("error", err.Error())) - - resp, _ := json.Marshal(map[string]interface{}{ - "error": fmt.Sprintf("failed to unmarshal query params for http RPC request: %s", err.Error()), - }) - - err := msg.Respond(resp) - if err != nil { - h.log.Error(fmt.Sprintf("failed to respond to host services RPC request: %s", err.Error())) - } - return - } - } - - status, resphdrs, err := client.Head(url.Path, params) + status, resphdrs, err := client.Head(url.Path, url.Query()) if err != nil { resp, _ := json.Marshal(map[string]interface{}{ "error": fmt.Sprintf("http reqeust failed: %s", err.Error()), diff --git a/internal/node/services/lib/keyvalue.go b/internal/node/services/lib/keyvalue.go index 8c520307..90482c28 100644 --- a/internal/node/services/lib/keyvalue.go +++ b/internal/node/services/lib/keyvalue.go @@ -74,25 +74,10 @@ func (k *KeyValueService) handleGet(msg *nats.Msg) { k.log.Warn(fmt.Sprintf("failed to resolve key/value store: %s", err.Error())) } - var req *agentapi.HostServicesKeyValueRequest - err = json.Unmarshal(msg.Data, &req) - if err != nil { - k.log.Warn(fmt.Sprintf("failed to unmarshal key/value RPC request: %s", err.Error())) - - resp, _ := json.Marshal(map[string]interface{}{ - "error": fmt.Sprintf("failed to unmarshal key/value RPC request: %s", err.Error()), - }) - - err := msg.Respond(resp) - if err != nil { - k.log.Error(fmt.Sprintf("failed to respond to host services RPC request: %s", err.Error())) - } - return - } - - if req.Key == nil { - resp, _ := json.Marshal(map[string]interface{}{ - "error": "key is required", + key := msg.Header.Get(agentapi.KeyValueKeyHeader) + if key == "" { + resp, _ := json.Marshal(&agentapi.HostServicesKeyValueResponse{ + Errors: []string{"key is required"}, }) err := msg.Respond(resp) @@ -102,12 +87,12 @@ func (k *KeyValueService) handleGet(msg *nats.Msg) { return } - entry, err := kvStore.Get(*req.Key) + entry, err := kvStore.Get(key) if err != nil { - k.log.Warn(fmt.Sprintf("failed to get value for key %s: %s", *req.Key, err.Error())) + k.log.Warn(fmt.Sprintf("failed to get value for key %s: %s", key, err.Error())) resp, _ := json.Marshal(map[string]interface{}{ - "error": fmt.Sprintf("failed to delete keys %s: %s", *req.Key, err.Error()), + "error": fmt.Sprintf("failed to get value for key %s: %s", key, err.Error()), }) err := msg.Respond(resp) @@ -117,12 +102,7 @@ func (k *KeyValueService) handleGet(msg *nats.Msg) { return } - val := json.RawMessage(entry.Value()) - resp, _ := json.Marshal(&agentapi.HostServicesKeyValueRequest{ - Key: req.Key, - Value: &val, - }) - err = msg.Respond(resp) + err = msg.Respond(entry.Value()) if err != nil { k.log.Warn(fmt.Sprintf("failed to respond to key/value host service request: %s", err.Error())) } @@ -138,13 +118,10 @@ func (k *KeyValueService) handleSet(msg *nats.Msg) { k.log.Warn(fmt.Sprintf("failed to resolve key/value store: %s", err.Error())) } - var req *agentapi.HostServicesKeyValueRequest - err = json.Unmarshal(msg.Data, &req) - if err != nil { - k.log.Warn(fmt.Sprintf("failed to unmarshal key/value RPC request: %s", err.Error())) - - resp, _ := json.Marshal(map[string]interface{}{ - "error": fmt.Sprintf("failed to unmarshal key/value RPC request: %s", err.Error()), + key := msg.Header.Get(agentapi.KeyValueKeyHeader) + if key == "" { + resp, _ := json.Marshal(&agentapi.HostServicesKeyValueResponse{ + Errors: []string{"key is required"}, }) err := msg.Respond(resp) @@ -154,38 +131,12 @@ func (k *KeyValueService) handleSet(msg *nats.Msg) { return } - // FIXME-- add req.Validate() - if req.Key == nil { - resp, _ := json.Marshal(map[string]interface{}{ - "error": "key is required", - }) - - err := msg.Respond(resp) - if err != nil { - k.log.Error(fmt.Sprintf("failed to respond to host services RPC request: %s", err.Error())) - } - return - } - - // FIXME-- add req.Validate() - if req.Value == nil { - resp, _ := json.Marshal(map[string]interface{}{ - "error": "value is required", - }) - - err := msg.Respond(resp) - if err != nil { - k.log.Error(fmt.Sprintf("failed to respond to host services RPC request: %s", err.Error())) - } - return - } - - revision, err := kvStore.Put(*req.Key, *req.Value) + revision, err := kvStore.Put(key, msg.Data) if err != nil { - k.log.Warn(fmt.Sprintf("failed to write %d-byte value for key %s: %s", len(*req.Value), *req.Key, err.Error())) + k.log.Warn(fmt.Sprintf("failed to write %d-byte value for key %s: %s", len(msg.Data), key, err.Error())) resp, _ := json.Marshal(map[string]interface{}{ - "error": fmt.Sprintf("failed to write %d-byte value for key %s: %s", len(*req.Value), *req.Key, err.Error()), + "error": fmt.Sprintf("failed to write %d-byte value for key %s: %s", len(msg.Data), key, err.Error()), }) err := msg.Respond(resp) @@ -215,25 +166,10 @@ func (k *KeyValueService) handleDelete(msg *nats.Msg) { k.log.Warn(fmt.Sprintf("failed to resolve key/value store: %s", err.Error())) } - var req *agentapi.HostServicesKeyValueRequest - err = json.Unmarshal(msg.Data, &req) - if err != nil { - k.log.Warn(fmt.Sprintf("failed to unmarshal key/value RPC request: %s", err.Error())) - - resp, _ := json.Marshal(map[string]interface{}{ - "error": fmt.Sprintf("failed to unmarshal key/value RPC request: %s", err.Error()), - }) - - err := msg.Respond(resp) - if err != nil { - k.log.Error(fmt.Sprintf("failed to respond to host services RPC request: %s", err.Error())) - } - return - } - - if req.Key == nil { - resp, _ := json.Marshal(map[string]interface{}{ - "error": "key is required", + key := msg.Header.Get(agentapi.KeyValueKeyHeader) + if key == "" { + resp, _ := json.Marshal(&agentapi.HostServicesKeyValueResponse{ + Errors: []string{"key is required"}, }) err := msg.Respond(resp) @@ -243,12 +179,12 @@ func (k *KeyValueService) handleDelete(msg *nats.Msg) { return } - err = kvStore.Delete(*req.Key) + err = kvStore.Delete(key) if err != nil { - k.log.Warn(fmt.Sprintf("failed to delete key %s: %s", *req.Key, err.Error())) + k.log.Warn(fmt.Sprintf("failed to delete key %s: %s", key, err.Error())) resp, _ := json.Marshal(map[string]interface{}{ - "error": fmt.Sprintf("failed to delete keys %s: %s", *req.Key, err.Error()), + "error": fmt.Sprintf("failed to delete key %s: %s", key, err.Error()), }) err := msg.Respond(resp) diff --git a/internal/node/services/lib/messaging.go b/internal/node/services/lib/messaging.go index 5fb0d7ef..c0c53a79 100644 --- a/internal/node/services/lib/messaging.go +++ b/internal/node/services/lib/messaging.go @@ -19,8 +19,6 @@ const messagingServiceMethodRequestMany = "requestMany" const messagingRequestTimeout = time.Millisecond * 500 // FIXME-- make timeout configurable per request? const messagingRequestManyTimeout = time.Millisecond * 3000 -const messageSubject = "x-subject" - type MessagingService struct { log *slog.Logger nc *nats.Conn @@ -66,7 +64,7 @@ func (m *MessagingService) HandleRPC(msg *nats.Msg) { } func (m *MessagingService) handlePublish(msg *nats.Msg) { - subject := msg.Header.Get(messageSubject) + subject := msg.Header.Get(agentapi.MessagingSubjectHeader) if subject == "" { resp, _ := json.Marshal(&agentapi.HostServicesMessagingResponse{ Errors: []string{"subject is required"}, @@ -104,7 +102,7 @@ func (m *MessagingService) handlePublish(msg *nats.Msg) { } func (m *MessagingService) handleRequest(msg *nats.Msg) { - subject := msg.Header.Get(messageSubject) + subject := msg.Header.Get(agentapi.MessagingSubjectHeader) if subject == "" { resp, _ := json.Marshal(&agentapi.HostServicesMessagingResponse{ Errors: []string{"subject is required"}, @@ -141,7 +139,7 @@ func (m *MessagingService) handleRequest(msg *nats.Msg) { } func (m *MessagingService) handleRequestMany(msg *nats.Msg) { - subject := msg.Header.Get(messageSubject) + subject := msg.Header.Get(agentapi.MessagingSubjectHeader) if subject == "" { resp, _ := json.Marshal(&agentapi.HostServicesMessagingResponse{ Errors: []string{"subject is required"}, diff --git a/internal/node/services/lib/objectstore.go b/internal/node/services/lib/objectstore.go index 1130b03f..5004e6d2 100644 --- a/internal/node/services/lib/objectstore.go +++ b/internal/node/services/lib/objectstore.go @@ -19,8 +19,6 @@ const objectStoreServiceMethodPut = "put" const objectStoreServiceMethodDelete = "delete" const objectStoreServiceMethodList = "list" -const objectName = "x-object-name" - type ObjectStoreService struct { log *slog.Logger nc *nats.Conn @@ -79,7 +77,7 @@ func (o *ObjectStoreService) handleGet(msg *nats.Msg) { o.log.Warn(fmt.Sprintf("failed to resolve object store: %s", err.Error())) } - name := msg.Header.Get(objectName) + name := msg.Header.Get(agentapi.ObjectStoreObjectNameHeader) if name == "" { resp, _ := json.Marshal(&agentapi.HostServicesObjectStoreResponse{ Errors: []string{"name is required"}, @@ -138,7 +136,7 @@ func (o *ObjectStoreService) handlePut(msg *nats.Msg) { o.log.Warn(fmt.Sprintf("failed to resolve object store: %s", err.Error())) } - name := msg.Header.Get(objectName) + name := msg.Header.Get(agentapi.ObjectStoreObjectNameHeader) if name == "" { resp, _ := json.Marshal(&agentapi.HostServicesObjectStoreResponse{ Errors: []string{"name is required"}, @@ -190,7 +188,7 @@ func (o *ObjectStoreService) handleDelete(msg *nats.Msg) { o.log.Warn(fmt.Sprintf("failed to resolve object store: %s", err.Error())) } - name := msg.Header.Get(objectName) + name := msg.Header.Get(agentapi.ObjectStoreObjectNameHeader) if name == "" { resp, _ := json.Marshal(&agentapi.HostServicesObjectStoreResponse{ Errors: []string{"name is required"}, diff --git a/internal/node/workload_mgr.go b/internal/node/workload_mgr.go index eadbe131..27abcbc9 100644 --- a/internal/node/workload_mgr.go +++ b/internal/node/workload_mgr.go @@ -33,8 +33,6 @@ const ( WorkloadCacheBucketName = "NEXCACHE" defaultHandshakeTimeoutMillis = 5000 - - nexRuntimeNs = "x-nex-runtime-ns" ) // The workload manager provides the high level strategy for the Nex node's workload management. It is responsible @@ -484,7 +482,7 @@ func (w *WorkloadManager) generateTriggerHandler(workloadID string, tsub string, _ = w.publishFunctionExecFailed(workloadID, *request.WorkloadName, tsub, err) } else if resp != nil { parentSpan.SetStatus(codes.Ok, "Trigger succeeded") - runtimeNs := resp.Header.Get(nexRuntimeNs) + runtimeNs := resp.Header.Get(agentapi.NexRuntimeNs) w.log.Debug("Received response from execution via trigger subject", slog.String("workload_id", workloadID), slog.String("trigger_subject", tsub), diff --git a/spec/node_test.go b/spec/node_test.go index 81e4b90e..4f72cb7f 100644 --- a/spec/node_test.go +++ b/spec/node_test.go @@ -618,15 +618,15 @@ var _ = Describe("nex node", func() { BeforeEach(func() { sub, _ = _fixtures.natsConn.Subscribe("hello.world.request", func(msg *nats.Msg) { - resp := fmt.Sprintf("resp: %s", string(msg.Data)) + resp := fmt.Sprintf("resp: %s", msg.Data) _ = msg.Respond([]byte(resp)) }) sub, _ = _fixtures.natsConn.Subscribe("hello.world.request.many", func(msg *nats.Msg) { - resp := fmt.Sprintf("resp #1: %s", string(msg.Data)) + resp := fmt.Sprintf("resp #1: %s", msg.Data) _ = msg.Respond([]byte(resp)) - resp = fmt.Sprintf("resp #2: %s", string(msg.Data)) + resp = fmt.Sprintf("resp #2: %s", msg.Data) _ = msg.Respond([]byte(resp)) }) }) @@ -850,8 +850,6 @@ var _ = Describe("nex node", func() { Expect(resp).ToNot(BeNil()) - fmt.Printf("IN TEST NAME: %s; NUID: %s", resp.List[0].Name, resp.List[0].NUID) - Expect(len(resp.List)).To(Equal(1)) Expect(resp.List[0].Name).To(Equal("hello2")) Expect(resp.Hello2).To(Equal("hello!"))