Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved opentracing support #222

Merged
merged 1 commit into from
Oct 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions Docs/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,26 @@ using [telepresence](https://telepresence.io):
telepresence --method=vpn-tcp --namespace fission --swap-deployment workflows:workflows --expose 5555 --expose 8080
```

### Local OpenTracing

The locally running instance does not have access to the in-cluster Jaeger deployment. To view the invocations, the
easiest option is to run a development all-in-one Jaeger deployment locally:

```bash
docker run -d --rm --name jaeger \
-e COLLECTOR_ZIPKIN_HTTP_PORT=9411 \
-p 5775:5775/udp \
-p 6831:6831/udp \
-p 6832:6832/udp \
-p 5778:5778 \
-p 16686:16686 \
-p 14268:14268 \
-p 9411:9411 \
jaegertracing/all-in-one:1.6
```

You can then navigate to `http://localhost:16686` to access the Jaeger UI.

## Testing

To run local unit and integration tests:
Expand Down
14 changes: 12 additions & 2 deletions cmd/fission-workflows-bundle/bundle/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,24 @@ func Run(ctx context.Context, opts *Options) error {
var es fes.Backend
var esPub pubsub.Publisher

var otOpts = []grpc_opentracing.Option{
grpc_opentracing.SpanDecorator(func(span opentracing.Span, method string, req, resp interface{},
grpcError error) {
span.SetTag("level", log.GetLevel().String())
}),
}
if opts.Debug {
otOpts = append(otOpts, grpc_opentracing.LogPayloads())
}

grpcServer := grpc.NewServer(
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
grpc_prometheus.StreamServerInterceptor,
grpc_opentracing.OpenTracingStreamServerInterceptor(tracer),
grpc_opentracing.OpenTracingStreamServerInterceptor(tracer, otOpts...),
)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
grpc_prometheus.UnaryServerInterceptor,
grpc_opentracing.OpenTracingServerInterceptor(tracer),
grpc_opentracing.OpenTracingServerInterceptor(tracer, otOpts...),
)),
)

Expand Down
9 changes: 9 additions & 0 deletions pkg/controller/actions.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package controller

import (
"fmt"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -98,3 +99,11 @@ func (a *MultiAction) Apply() error {
}
return err.(error)
}

func (a *MultiAction) String() string {
var results []string
for _, action := range a.Actions {
results = append(results, fmt.Sprintf("%+v", action))
}
return fmt.Sprintf("%v", results)
}
28 changes: 14 additions & 14 deletions pkg/controller/evaluation.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"time"

"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/opentracing/opentracing-go/log"
"github.com/sirupsen/logrus"
)

Expand All @@ -22,9 +20,9 @@ type EvalStore struct {
mp sync.Map
}

func (e *EvalStore) LoadOrStore(id string, spanCtx opentracing.SpanContext) *EvalState {
s, _ := e.mp.LoadOrStore(id, NewEvalState(id, spanCtx))
return s.(*EvalState)
func (e *EvalStore) LoadOrStore(id string, spanCtx opentracing.SpanContext) (*EvalState, bool) {
s, ok := e.mp.LoadOrStore(id, NewEvalState(id, spanCtx))
return s.(*EvalState), ok
}

func (e *EvalStore) Load(id string) (*EvalState, bool) {
Expand Down Expand Up @@ -90,16 +88,15 @@ func NewEvalState(id string, spanCtx opentracing.SpanContext) *EvalState {
log: EvalLog{},
id: id,
evalLock: make(chan struct{}, 1),
span: opentracing.StartSpan("EvalState", opentracing.FollowsFrom(spanCtx)),
span: opentracing.StartSpan("/controller/eval", opentracing.FollowsFrom(spanCtx)),
}
e.span.SetTag(string(ext.Component), "controller.workflow")
e.span.SetTag("workflow.id", id)
e.span.SetTag("id", id)
e.Free()
return e
}

func (e *EvalState) Span() opentracing.SpanContext {
return e.span.Context()
func (e *EvalState) Span() opentracing.Span {
return e.span
}

func (e *EvalState) IsFinished() bool {
Expand All @@ -122,10 +119,6 @@ func (e *EvalState) Finish(success bool, msg ...string) {
e.finished = true
}

func (e *EvalState) Log(fields ...log.Field) {
e.span.LogFields(fields...)
}

func (e *EvalState) Close() error {
e.Finish(false, "controller closed")
return nil
Expand Down Expand Up @@ -195,6 +188,13 @@ func (e *EvalState) Logs() EvalLog {
func (e *EvalState) Record(record EvalRecord) {
e.dataLock.Lock()
e.log.Record(record)
var eval interface{}
if record.Error != nil {
eval = fmt.Sprintf("error: %v", record.Error)
} else {
eval = fmt.Sprintf("action: %T - %+v", record.Action, record.Action)
}
e.Span().LogKV("evaluation", eval)
e.dataLock.Unlock()
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/evaluation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func TestEvalCache_GetOrCreate(t *testing.T) {
assert.False(t, ok)
assert.Empty(t, es)

es = ec.LoadOrStore(id, nil)
es, _ = ec.LoadOrStore(id, nil)
assert.Equal(t, id, es.ID())

es, ok = ec.Load(id)
Expand Down
74 changes: 62 additions & 12 deletions pkg/controller/invocation/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,29 +86,64 @@ func (a *ActionInvokeTask) logger() logrus.FieldLogger {
})
}

func (a *ActionInvokeTask) String() string {
return fmt.Sprintf("task/run(%s)", a.Task.GetId())
}

func (a *ActionInvokeTask) Apply() error {
log := a.logger()
span := opentracing.StartSpan("pkg/controller/invocation/actions/InvokeTask",
opentracing.ChildOf(a.ec.Span()))
span := opentracing.StartSpan(fmt.Sprintf("/task/%s", a.Task.GetId()), opentracing.ChildOf(a.ec.Span().Context()))
span.SetTag("task", a.Task.GetId())
defer span.Finish()

// Find task
task, ok := types.GetTask(a.Wf, a.Wfi, a.Task.Id)
if !ok {
return fmt.Errorf("task '%v' could not be found", a.Wfi.ID())
err := fmt.Errorf("task '%v' could not be found", a.Wfi.ID())
span.LogKV("error", err)
return err
}

span.SetTag("fnref", task.GetStatus().GetFnRef())
if logrus.GetLevel() == logrus.DebugLevel {
var err error
var inputs interface{}
inputs, err = typedvalues.UnwrapMapTypedValue(task.GetSpec().GetInputs())
if err != nil {
inputs = fmt.Sprintf("error: %v", err)
}
span.LogKV("inputs", inputs)
}

// Check if function has been resolved
if task.Status.FnRef == nil {
return fmt.Errorf("no resolved Task could be found for FunctionRef '%v'", task.Spec.FunctionRef)
err := fmt.Errorf("no resolved Task could be found for FunctionRef '%v'", task.Spec.FunctionRef)
span.LogKV("error", err)
return err
}

// Pre-execution: Resolve expression inputs
exprEvalStart := time.Now()
inputs, err := a.resolveInputs(a.Task.Inputs)
exprEvalDuration.Observe(float64(time.Now().Sub(exprEvalStart)))
if err != nil {
log.Error(err)
return err
var inputs map[string]*typedvalues.TypedValue
if len(a.Task.Inputs) > 0 {
var err error
exprEvalStart := time.Now()
inputs, err = a.resolveInputs(a.Task.Inputs)
exprEvalDuration.Observe(float64(time.Now().Sub(exprEvalStart)))
if err != nil {
log.Error(err)
span.LogKV("error", err)
return err
}

if logrus.GetLevel() == logrus.DebugLevel {
var err error
var resolvedInputs interface{}
resolvedInputs, err = typedvalues.UnwrapMapTypedValue(inputs)
if err != nil {
resolvedInputs = fmt.Sprintf("error: %v", err)
}
span.LogKV("resolved_inputs", resolvedInputs)
}
}

// Invoke task
Expand All @@ -127,13 +162,28 @@ func (a *ActionInvokeTask) Apply() error {
log.Debugf("Using inputs: %v", i)
}
}

ctx := opentracing.ContextWithSpan(context.Background(), span)
_, err = a.API.Invoke(spec, api.WithContext(ctx), api.PostTransformer(a.postTransformer))
updated, err := a.API.Invoke(spec, api.WithContext(ctx), api.PostTransformer(a.postTransformer))
if err != nil {
log.Errorf("Failed to execute task: %v", err)
span.LogKV("error", err)
return err
}
span.Finish()
span.SetTag("status", updated.GetStatus().GetStatus().String())
if !updated.GetStatus().Successful() {
span.LogKV("error", updated.GetStatus().GetError().String())
}
if logrus.GetLevel() == logrus.DebugLevel {
var err error
var output interface{}
output, err = typedvalues.Unwrap(updated.GetStatus().GetOutput())
if err != nil {
output = fmt.Sprintf("error: %v", err)
}
span.LogKV("output", output)
}

return nil
}

Expand Down
32 changes: 22 additions & 10 deletions pkg/controller/invocation/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,22 +138,22 @@ func (cr *Controller) Notify(update *fes.Notification) error {
return err
}

if es, ok := cr.evalStore.Load(entity.ID()); ok {
es.Span().LogKV("event", fmt.Sprintf("%s - %v", update.EventType, update.Labels()))
}
switch update.EventType {
case events.EventInvocationCompleted:
fallthrough
cr.finishAndDeleteEvalState(entity.ID(), true, "completion reason: "+events.EventInvocationCompleted)
case events.EventInvocationCanceled:
fallthrough
cr.finishAndDeleteEvalState(entity.ID(), false, "completion reason: "+events.EventInvocationCanceled)
case events.EventInvocationFailed:
// TODO mark to clean up later instead
cr.stateStore.Delete(entity.ID())
cr.evalStore.Delete(entity.ID())
log.Debugf("Removed entity %v from eval state", entity.ID())
cr.finishAndDeleteEvalState(entity.ID(), false, "completion reason: "+events.EventInvocationFailed)
case events.EventTaskFailed:
fallthrough
case events.EventTaskSucceeded:
fallthrough
case events.EventInvocationCreated:
es := cr.evalStore.LoadOrStore(entity.ID(), update.SpanCtx)
es, _ := cr.evalStore.LoadOrStore(entity.ID(), update.SpanCtx)
cr.workQueue.Add(es)
default:
log.Debugf("Controller ignored event type: %v", update.EventType)
Expand Down Expand Up @@ -215,9 +215,10 @@ func (cr *Controller) checkModelCaches() error {
}

if !wi.Status.Finished() {
span := opentracing.GlobalTracer().StartSpan("recoverFromModelCache")
// TODO grab the span context from the model / events
span := opentracing.GlobalTracer().StartSpan("/controller/recoverFromModelCache")
controller.EvalRecovered.WithLabelValues(Name, "cache").Inc()
es := cr.evalStore.LoadOrStore(wi.ID(), span.Context())
es, _ := cr.evalStore.LoadOrStore(wi.ID(), span.Context())
cr.workQueue.Add(es)
span.Finish()
}
Expand Down Expand Up @@ -295,7 +296,7 @@ func (cr *Controller) Evaluate(invocationID string) {

controller.EvalDuration.WithLabelValues(Name, fmt.Sprintf("%T", action)).Observe(float64(time.Now().Sub(start)))
if wfi.GetStatus().Finished() {
cr.evalStore.Delete(wfi.ID())
cr.finishAndDeleteEvalState(wfi.ID(), true, "")
t, _ := ptypes.Timestamp(wfi.GetMetadata().GetCreatedAt())
invocationDuration.Observe(float64(time.Now().Sub(t)))
}
Expand Down Expand Up @@ -349,6 +350,17 @@ func (cr *Controller) processNextItem(ctx context.Context, pool *gopool.GoPool)
return true
}

func (cr *Controller) finishAndDeleteEvalState(evalStateID string, success bool, msg string) {
es, ok := cr.evalStore.Load(evalStateID)
if !ok {
return
}
es.Finish(success, msg)
cr.evalStore.Delete(evalStateID)
cr.stateStore.Delete(evalStateID)
log.Debugf("Removed entity %v from eval state", evalStateID)
}

func defaultPolicy(ctr *Controller) controller.Rule {
return &controller.RuleEvalUntilAction{
Rules: []controller.Rule{
Expand Down
8 changes: 8 additions & 0 deletions pkg/fnenv/fission/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,14 @@ func (fe *FunctionEnv) Invoke(spec *types.TaskInvocationSpec, opts ...fnenv.Invo
if err := validate.TaskInvocationSpec(spec); err != nil {
return nil, err
}
span, _ := opentracing.StartSpanFromContext(cfg.Ctx, "/fnenv/fission")
defer span.Finish()
fnRef := *spec.FnRef
span.SetTag("fnref", fnRef.Format())

// Construct request and add body
url := fe.createRouterURL(fnRef)
span.SetTag("url", url)
req, err := http.NewRequest(defaultHTTPMethod, url, nil)
if err != nil {
panic(fmt.Errorf("failed to create request for '%v': %v", url, err))
Expand Down Expand Up @@ -96,11 +100,14 @@ func (fe *FunctionEnv) Invoke(spec *types.TaskInvocationSpec, opts ...fnenv.Invo
}
fmt.Println(string(bs))
fmt.Println("--- HTTP Request end ---")
span.LogKV("HTTP request", string(bs))
}
span.LogKV("http", fmt.Sprintf("%s %v", req.Method, req.URL))
resp, err := http.DefaultClient.Do(req.WithContext(cfg.Ctx))
if err != nil {
return nil, fmt.Errorf("error for reqUrl '%v': %v", url, err)
}
span.LogKV("status code", resp.Status)

fnenv.FnActive.WithLabelValues(Name).Dec()

Expand All @@ -113,6 +120,7 @@ func (fe *FunctionEnv) Invoke(spec *types.TaskInvocationSpec, opts ...fnenv.Invo
}
fmt.Println(string(bs))
fmt.Println("--- HTTP Response end ---")
span.LogKV("HTTP response", string(bs))
}

// Parse output
Expand Down
2 changes: 1 addition & 1 deletion pkg/fnenv/native/native.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (fe *FunctionEnv) Invoke(spec *types.TaskInvocationSpec, opts ...fnenv.Invo
if !ok {
return nil, fmt.Errorf("could not resolve internal function '%s'", fnID)
}
span, _ := opentracing.StartSpanFromContext(cfg.Ctx, fmt.Sprintf("fnenv/internal/fn/%s", fnID))
span, _ := opentracing.StartSpanFromContext(cfg.Ctx, fmt.Sprintf("/fnenv/internal/%s", fnID))
defer span.Finish()
fnenv.FnActive.WithLabelValues(Name).Inc()
out, err := fn.Invoke(spec)
Expand Down
Loading