Skip to content

Commit

Permalink
Add tracing to host services (#222)
Browse files Browse the repository at this point in the history
Adds otel child span support for tracing host services execution
  • Loading branch information
kthomas authored May 15, 2024
1 parent 947a70e commit 25f07f8
Show file tree
Hide file tree
Showing 17 changed files with 181 additions and 104 deletions.
7 changes: 7 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"github.com/nats-io/nats.go"
"github.com/synadia-io/nex/agent/providers"
agentapi "github.com/synadia-io/nex/internal/agent-api"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
)

const defaultAgentHandshakeTimeoutMillis = 500
Expand Down Expand Up @@ -344,6 +346,11 @@ func (a *Agent) handleHealthz(w http.ResponseWriter, req *http.Request) {
func (a *Agent) init() error {
a.installSignalHandlers()

otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
))

err := a.requestHandshake()
if err != nil {
a.LogError(fmt.Sprintf("Failed to handshake with node: %s", err))
Expand Down
3 changes: 2 additions & 1 deletion agent/providers/api.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package providers

import (
"context"
"errors"

"github.com/synadia-io/nex/agent/providers/lib"
Expand All @@ -27,7 +28,7 @@ type ExecutionProvider interface {
Deploy() error

// Execute a deployed function, if supported by the execution provider implementation (e.g., "v8" and "wasm" types)
Execute(subject string, payload []byte) ([]byte, error)
Execute(ctx context.Context, payload []byte) ([]byte, error)

// Undeploy a workload, giving it a chance to gracefully clean up after itself (if applicable)
Undeploy() error
Expand Down
3 changes: 2 additions & 1 deletion agent/providers/lib/elf.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package lib

import (
"context"
"debug/elf"
"errors"
"fmt"
Expand Down Expand Up @@ -86,7 +87,7 @@ func (e *ELF) removeWorkload() {
_ = os.Remove(e.tmpFilename)
}

func (e *ELF) Execute(subject string, payload []byte) ([]byte, error) {
func (e *ELF) Execute(ctx context.Context, payload []byte) ([]byte, error) {
return nil, errors.New("ELF execution provider does not support execution via trigger subjects")
}

Expand Down
3 changes: 2 additions & 1 deletion agent/providers/lib/oci.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package lib

import (
"context"
"errors"

agentapi "github.com/synadia-io/nex/internal/agent-api"
Expand All @@ -14,7 +15,7 @@ func (o *OCI) Deploy() error {
return errors.New("oci execution provider not yet implemented")
}

func (o *OCI) Execute(subject string, payload []byte) ([]byte, error) {
func (o *OCI) Execute(ctx context.Context, payload []byte) ([]byte, error) {
return nil, errors.New("oci execution provider does not support execution via trigger subjects")
}

Expand Down
95 changes: 56 additions & 39 deletions agent/providers/lib/v8.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package lib

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand All @@ -18,6 +19,8 @@ import (
hostservices "github.com/synadia-io/nex/host-services"
"github.com/synadia-io/nex/host-services/builtins"
agentapi "github.com/synadia-io/nex/internal/agent-api"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
v8 "rogchap.com/v8go"
)

Expand Down Expand Up @@ -96,20 +99,26 @@ func (v *V8) Deploy() error {

subject := fmt.Sprintf("agentint.%s.trigger", v.vmID)
_, err := v.nc.Subscribe(subject, func(msg *nats.Msg) {
ctx := context.WithValue(context.Background(), agentapi.NexTriggerSubject, msg.Header.Get(agentapi.NexTriggerSubject)) //nolint:all
ctx = otel.GetTextMapPropagator().Extract(ctx, propagation.HeaderCarrier(msg.Header))

startTime := time.Now()
val, err := v.Execute(msg.Header.Get(agentapi.NexTriggerSubject), msg.Data)
val, err := v.Execute(ctx, msg.Data)
if err != nil {
_, _ = v.stderr.Write([]byte(fmt.Sprintf("failed to execute function on trigger subject %s: %s", subject, err.Error())))
return
}

runtimeNanos := time.Since(startTime).Nanoseconds()

header := nats.Header{
agentapi.NexRuntimeNs: []string{strconv.FormatInt(runtimeNanos, 10)},
}
otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(header))

err = msg.RespondMsg(&nats.Msg{
Data: val,
Header: nats.Header{
agentapi.NexRuntimeNs: []string{strconv.FormatInt(runtimeNanos, 10)},
},
Data: val,
Header: header,
})
if err != nil {
_, _ = v.stderr.Write([]byte(fmt.Sprintf("failed to write %d-byte response: %s", len(val), err.Error())))
Expand All @@ -127,12 +136,20 @@ func (v *V8) Deploy() error {
// Trigger execution of the deployed function; expects a `Validate` to have succeeded and `ubs` to be non-nil.
// The executed function can optionally return a value, in which case it will be deemed a reply and returned
// to the caller. In the case of a nil or empty value returned by the function, no reply will be sent.
func (v *V8) Execute(subject string, payload []byte) ([]byte, error) {
func (v *V8) Execute(ctx context.Context, payload []byte) ([]byte, error) {
if v.ubs == nil {
return nil, fmt.Errorf("invalid state for execution; no compiled code available for vm: %s", v.name)
}

ctx, err := v.newV8Context()
var subject string
sub, ok := ctx.Value(agentapi.NexTriggerSubject).(string)
if ok {
subject = sub
} else {
return nil, fmt.Errorf("failed to initialize context in vm; no trigger subject provided in context: %s", v.name)
}

v8ctx, err := v.newV8Context(ctx)
if err != nil {
return nil, fmt.Errorf("failed to initialize context in vm: %s", err.Error())
}
Expand All @@ -141,7 +158,7 @@ func (v *V8) Execute(subject string, payload []byte) ([]byte, error) {
errs := make(chan error, 1)

go func() {
val, err := v.ubs.Run(ctx)
val, err := v.ubs.Run(v8ctx)
if err != nil {
errs <- err
return
Expand All @@ -153,7 +170,7 @@ func (v *V8) Execute(subject string, payload []byte) ([]byte, error) {
return
}

argv1, err := v8.NewValue(ctx.Isolate(), subject)
argv1, err := v8.NewValue(v8ctx.Isolate(), subject)
if err != nil {
errs <- err
return
Expand All @@ -166,7 +183,7 @@ func (v *V8) Execute(subject string, payload []byte) ([]byte, error) {
return
}

val, err = fn.Call(ctx.Global(), argv1, argv2)
val, err = fn.Call(v8ctx.Global(), argv1, argv2)
if err != nil {
errs <- err
return
Expand Down Expand Up @@ -270,10 +287,10 @@ func (v *V8) initUtils() {
v.utils[v8FunctionUInt8ArrayToString] = uint8arrtostrfn
}

func (v *V8) newV8Context() (*v8.Context, error) {
func (v *V8) newV8Context(ctx context.Context) (*v8.Context, error) {
global := v8.NewObjectTemplate(v.iso)

hostServices, err := v.newHostServicesTemplate()
hostServices, err := v.newHostServicesTemplate(ctx)
if err != nil {
return nil, err
}
Expand All @@ -286,63 +303,63 @@ func (v *V8) newV8Context() (*v8.Context, error) {
return v8.NewContext(v.iso, global), nil
}

func (v *V8) newHostServicesTemplate() (*v8.ObjectTemplate, error) {
func (v *V8) newHostServicesTemplate(ctx context.Context) (*v8.ObjectTemplate, error) {
hostServices := v8.NewObjectTemplate(v.iso)

err := hostServices.Set(hostServicesHTTPObjectName, v.newHTTPObjectTemplate())
err := hostServices.Set(hostServicesHTTPObjectName, v.newHTTPObjectTemplate(ctx))
if err != nil {
return nil, err
}

err = hostServices.Set(hostServicesKVObjectName, v.newKeyValueObjectTemplate())
err = hostServices.Set(hostServicesKVObjectName, v.newKeyValueObjectTemplate(ctx))
if err != nil {
return nil, err
}

err = hostServices.Set(hostServicesMessagingObjectName, v.newMessagingObjectTemplate())
err = hostServices.Set(hostServicesMessagingObjectName, v.newMessagingObjectTemplate(ctx))
if err != nil {
return nil, err
}

err = hostServices.Set(hostServicesObjectStoreObjectName, v.newObjectStoreObjectTemplate())
err = hostServices.Set(hostServicesObjectStoreObjectName, v.newObjectStoreObjectTemplate(ctx))
if err != nil {
return nil, err
}

return hostServices, nil
}

func (v *V8) newHTTPObjectTemplate() *v8.ObjectTemplate {
func (v *V8) newHTTPObjectTemplate(ctx context.Context) *v8.ObjectTemplate {
http := v8.NewObjectTemplate(v.iso)

_ = http.Set(hostServicesHTTPGetFunctionName, v8.NewFunctionTemplate(
v.iso, v.genHttpClientFunc(hostServicesHTTPGetFunctionName),
v.iso, v.genHttpClientFunc(ctx, hostServicesHTTPGetFunctionName),
))

_ = http.Set(hostServicesHTTPPostFunctionName, v8.NewFunctionTemplate(
v.iso, v.genHttpClientFunc(hostServicesHTTPPostFunctionName),
v.iso, v.genHttpClientFunc(ctx, hostServicesHTTPPostFunctionName),
))

_ = http.Set(hostServicesHTTPPutFunctionName, v8.NewFunctionTemplate(
v.iso, v.genHttpClientFunc(hostServicesHTTPPutFunctionName),
v.iso, v.genHttpClientFunc(ctx, hostServicesHTTPPutFunctionName),
))

_ = http.Set(hostServicesHTTPPatchFunctionName, v8.NewFunctionTemplate(
v.iso, v.genHttpClientFunc(hostServicesHTTPPatchFunctionName),
v.iso, v.genHttpClientFunc(ctx, hostServicesHTTPPatchFunctionName),
))

_ = http.Set(hostServicesHTTPDeleteFunctionName, v8.NewFunctionTemplate(
v.iso, v.genHttpClientFunc(hostServicesHTTPDeleteFunctionName),
v.iso, v.genHttpClientFunc(ctx, hostServicesHTTPDeleteFunctionName),
))

_ = http.Set(hostServicesHTTPHeadFunctionName, v8.NewFunctionTemplate(
v.iso, v.genHttpClientFunc(hostServicesHTTPHeadFunctionName),
v.iso, v.genHttpClientFunc(ctx, hostServicesHTTPHeadFunctionName),
))

return http
}

func (v *V8) genHttpClientFunc(method string) func(info *v8.FunctionCallbackInfo) *v8.Value {
func (v *V8) genHttpClientFunc(ctx context.Context, method string) func(info *v8.FunctionCallbackInfo) *v8.Value {
return func(info *v8.FunctionCallbackInfo) *v8.Value {
args := info.Args()
if len(args) == 0 {
Expand All @@ -367,7 +384,7 @@ func (v *V8) genHttpClientFunc(method string) func(info *v8.FunctionCallbackInfo
}
}

httpresp, err := v.builtins.SimpleHttpRequest(method, url.String(), payload)
httpresp, err := v.builtins.SimpleHttpRequest(ctx, method, url.String(), payload)
if err != nil {
val, _ := v8.NewValue(v.iso, err.Error())
return v.iso.ThrowException(val)
Expand Down Expand Up @@ -419,7 +436,7 @@ func (v *V8) genHttpClientFunc(method string) func(info *v8.FunctionCallbackInfo
}
}

func (v *V8) newKeyValueObjectTemplate() *v8.ObjectTemplate {
func (v *V8) newKeyValueObjectTemplate(ctx context.Context) *v8.ObjectTemplate {
kv := v8.NewObjectTemplate(v.iso)

_ = kv.Set(hostServicesKVGetFunctionName, v8.NewFunctionTemplate(v.iso, func(info *v8.FunctionCallbackInfo) *v8.Value {
Expand All @@ -431,7 +448,7 @@ func (v *V8) newKeyValueObjectTemplate() *v8.ObjectTemplate {

key := args[0].String()

resp, err := v.builtins.KVGet(key)
resp, err := v.builtins.KVGet(ctx, key)
if err != nil {
val, _ := v8.NewValue(v.iso, err.Error())
return v.iso.ThrowException(val)
Expand Down Expand Up @@ -462,7 +479,7 @@ func (v *V8) newKeyValueObjectTemplate() *v8.ObjectTemplate {
return v.iso.ThrowException(val)
}

kvresp, err := v.builtins.KVSet(key, value)
kvresp, err := v.builtins.KVSet(ctx, key, value)
if err != nil {
val, _ := v8.NewValue(v.iso, err.Error())
return v.iso.ThrowException(val)
Expand All @@ -485,7 +502,7 @@ func (v *V8) newKeyValueObjectTemplate() *v8.ObjectTemplate {

key := args[0].String()

kvresp, err := v.builtins.KVDelete(key)
kvresp, err := v.builtins.KVDelete(ctx, key)
if err != nil {
val, _ := v8.NewValue(v.iso, err.Error())
return v.iso.ThrowException(val)
Expand All @@ -501,7 +518,7 @@ func (v *V8) newKeyValueObjectTemplate() *v8.ObjectTemplate {

_ = kv.Set(hostServicesKVKeysFunctionName, v8.NewFunctionTemplate(v.iso, func(info *v8.FunctionCallbackInfo) *v8.Value {

resp, err := v.builtins.KVKeys()
resp, err := v.builtins.KVKeys(ctx)
if err != nil {
val, _ := v8.NewValue(v.iso, err.Error())
return v.iso.ThrowException(val)
Expand All @@ -522,7 +539,7 @@ func (v *V8) newKeyValueObjectTemplate() *v8.ObjectTemplate {
return kv
}

func (v *V8) newMessagingObjectTemplate() *v8.ObjectTemplate {
func (v *V8) newMessagingObjectTemplate(ctx context.Context) *v8.ObjectTemplate {
messaging := v8.NewObjectTemplate(v.iso)

_ = messaging.Set(hostServicesMessagingPublishFunctionName, v8.NewFunctionTemplate(v.iso, func(info *v8.FunctionCallbackInfo) *v8.Value {
Expand All @@ -539,7 +556,7 @@ func (v *V8) newMessagingObjectTemplate() *v8.ObjectTemplate {
return v.iso.ThrowException(val)
}

err = v.builtins.MessagingPublish(subject, payload)
err = v.builtins.MessagingPublish(ctx, subject, payload)
if err != nil {
val, _ := v8.NewValue(v.iso, err.Error())
return v.iso.ThrowException(val)
Expand All @@ -562,7 +579,7 @@ func (v *V8) newMessagingObjectTemplate() *v8.ObjectTemplate {
return v.iso.ThrowException(val)
}

resp, err := v.builtins.MessagingRequest(subject, payload)
resp, err := v.builtins.MessagingRequest(ctx, subject, payload)
if err != nil {
val, _ := v8.NewValue(v.iso, err.Error())
return v.iso.ThrowException(val)
Expand Down Expand Up @@ -664,7 +681,7 @@ func (v *V8) newMessagingObjectTemplate() *v8.ObjectTemplate {
return messaging
}

func (v *V8) newObjectStoreObjectTemplate() *v8.ObjectTemplate {
func (v *V8) newObjectStoreObjectTemplate(ctx context.Context) *v8.ObjectTemplate {
objectStore := v8.NewObjectTemplate(v.iso)

_ = objectStore.Set(hostServicesObjectStoreGetFunctionName, v8.NewFunctionTemplate(v.iso, func(info *v8.FunctionCallbackInfo) *v8.Value {
Expand All @@ -675,7 +692,7 @@ func (v *V8) newObjectStoreObjectTemplate() *v8.ObjectTemplate {
}

name := args[0].String()
resp, err := v.builtins.ObjectGet(name)
resp, err := v.builtins.ObjectGet(ctx, name)
if err != nil {
val, _ := v8.NewValue(v.iso, err.Error())
return v.iso.ThrowException(val)
Expand Down Expand Up @@ -706,7 +723,7 @@ func (v *V8) newObjectStoreObjectTemplate() *v8.ObjectTemplate {
return v.iso.ThrowException(val)
}

resp, err := v.builtins.ObjectPut(name, value)
resp, err := v.builtins.ObjectPut(ctx, name, value)

if err != nil {
val, _ := v8.NewValue(v.iso, err.Error())
Expand All @@ -733,7 +750,7 @@ func (v *V8) newObjectStoreObjectTemplate() *v8.ObjectTemplate {
}

name := args[0].String()
err := v.builtins.ObjectDelete(name)
err := v.builtins.ObjectDelete(ctx, name)
if err != nil {
val, _ := v8.NewValue(v.iso, err.Error())
return v.iso.ThrowException(val)
Expand All @@ -743,7 +760,7 @@ func (v *V8) newObjectStoreObjectTemplate() *v8.ObjectTemplate {
}))

_ = objectStore.Set(hostServicesObjectStoreListFunctionName, v8.NewFunctionTemplate(v.iso, func(info *v8.FunctionCallbackInfo) *v8.Value {
resp, err := v.builtins.ObjectList()
resp, err := v.builtins.ObjectList(ctx)
if err != nil {
val, _ := v8.NewValue(v.iso, err.Error())
return v.iso.ThrowException(val)
Expand Down
Loading

0 comments on commit 25f07f8

Please sign in to comment.