-
Notifications
You must be signed in to change notification settings - Fork 55
Add trace support into task #55
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 3 commits
33790bd
9fd912b
d11688c
3bbf9d1
a5b3495
7a608b7
ab03cd0
14a9bda
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -94,7 +94,7 @@ func (c *TaskHubGrpcClient) processActivityWorkItem( | |||||
| executor backend.Executor, | ||||||
| req *protos.ActivityRequest, | ||||||
| ) { | ||||||
| var tc *protos.TraceContext = nil // TODO: How to populate trace context? | ||||||
| var tc *protos.TraceContext = req.ParentTraceContext | ||||||
|
||||||
| stream, err := c.client.GetWorkItems(ctx, &req) |
I tried a local run to assert this
This is the context we build up on grpc server side

This is the context at
durabletask-go/client/worker_grpc.go
Line 93 in fc57567
| ctx context.Context, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copying from here: #31 (comment)
Agree @kaibocai. I looked into it as well. We can easily propagate context for a normal grpc request(non-streaming) using otel interceptors(e.g. here) but for streaming requests, context is propagated at the start only i.e. when the stream is created. Once the stream creation is done, then since no context is passed during stream.send() method, and because we can't update stream context, the newer context(updated after stream creation) is never passed to client/server.
Here in DTF-Go as well at the time of stream creation, there is nothing in context(since no orchestration/acitivity are created yet). We have the updated context(with all span info) when activity/orchestration workitem needs to be sent to the app but this context is not passed while sending the workItem. So best way to do it is to encode trace context in the message itself.
Large diffs are not rendered by default.
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -2,7 +2,9 @@ package task | |||
|
|
||||
| import ( | ||||
| "context" | ||||
| "fmt" | ||||
|
|
||||
| "github.com/microsoft/durabletask-go/internal/helpers" | ||||
| "github.com/microsoft/durabletask-go/internal/protos" | ||||
| "google.golang.org/protobuf/types/known/wrapperspb" | ||||
| ) | ||||
|
|
@@ -52,6 +54,10 @@ type activityContext struct { | |||
| type Activity func(ctx ActivityContext) (any, error) | ||||
|
|
||||
| func newTaskActivityContext(ctx context.Context, taskID int32, ts *protos.TaskScheduledEvent) *activityContext { | ||||
| ctx, err := helpers.ContextFromTraceContext(ctx, ts.ParentTraceContext) | ||||
|
||||
| func (c *TaskHubGrpcClient) processActivityWorkItem( |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,6 +11,7 @@ import ( | |
|
|
||
| "github.com/stretchr/testify/assert" | ||
| "github.com/stretchr/testify/require" | ||
| "go.opentelemetry.io/otel" | ||
|
|
||
| "github.com/microsoft/durabletask-go/api" | ||
| "github.com/microsoft/durabletask-go/backend" | ||
|
|
@@ -19,6 +20,8 @@ import ( | |
| "github.com/microsoft/durabletask-go/task" | ||
| ) | ||
|
|
||
| var tracer = otel.Tracer("orchestration-test") | ||
|
|
||
| func Test_EmptyOrchestration(t *testing.T) { | ||
| // Registration | ||
| r := task.NewTaskRegistry() | ||
|
|
@@ -207,6 +210,56 @@ func Test_SingleActivity(t *testing.T) { | |
| ) | ||
| } | ||
|
|
||
| func Test_SingleActivity_TaskSpan(t *testing.T) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There should be a test added in https://github.com/microsoft/durabletask-go/blob/main/tests/grpc/grpc_test.go as well to validate we are correctly passing the context information over grpc stream. |
||
| // Registration | ||
| r := task.NewTaskRegistry() | ||
| r.AddOrchestratorN("SingleActivity", func(ctx *task.OrchestrationContext) (any, error) { | ||
| var input string | ||
| if err := ctx.GetInput(&input); err != nil { | ||
| return nil, err | ||
| } | ||
| var output string | ||
| err := ctx.CallActivity("SayHello", task.WithActivityInput(input)).Await(&output) | ||
| return output, err | ||
| }) | ||
| r.AddActivityN("SayHello", func(ctx task.ActivityContext) (any, error) { | ||
| var name string | ||
| if err := ctx.GetInput(&name); err != nil { | ||
| return nil, err | ||
| } | ||
| _, childSpan := tracer.Start(ctx.Context(), "activityChild") | ||
| childSpan.End() | ||
| return fmt.Sprintf("Hello, %s!", name), nil | ||
| }) | ||
|
|
||
| // Initialization | ||
| ctx := context.Background() | ||
| exporter := initTracing() | ||
| client, worker := initTaskHubWorker(ctx, r) | ||
| defer worker.Shutdown(ctx) | ||
|
|
||
| // Run the orchestration | ||
| id, err := client.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界")) | ||
| if assert.NoError(t, err) { | ||
| metadata, err := client.WaitForOrchestrationCompletion(ctx, id) | ||
| if assert.NoError(t, err) { | ||
| assert.Equal(t, protos.OrchestrationStatus_ORCHESTRATION_STATUS_COMPLETED, metadata.RuntimeStatus) | ||
| assert.Equal(t, `"Hello, 世界!"`, metadata.SerializedOutput) | ||
| } | ||
| } | ||
|
|
||
| // Validate the exported OTel traces | ||
| spans := exporter.GetSpans().Snapshots() | ||
| assertSpanSequence(t, spans, | ||
| assertOrchestratorCreated("SingleActivity", id), | ||
| assertSpan("activityChild"), | ||
| assertActivity("SayHello", id, 0), | ||
| assertOrchestratorExecuted("SingleActivity", id, "COMPLETED"), | ||
| ) | ||
| // assert child-parent relationship | ||
| assert.Equal(t, spans[1].Parent().SpanID(), spans[2].SpanContext().SpanID()) | ||
| } | ||
|
|
||
| func Test_ActivityChain(t *testing.T) { | ||
| // Registration | ||
| r := task.NewTaskRegistry() | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to reset the
ParentTraceContexthere as the newly created context above already has the span information. It can be extracted in theExecuteActivityitself.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I still need to set the
ParentTraceContextas I need to populate it heredurabletask-go/backend/executor.go
Line 151 in d11688c
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can actually get span from context itself: https://pkg.go.dev/go.opentelemetry.io/otel/trace#SpanFromContext and then get traceContext from span, but i guess it's easier updating the event itself.
My issue was updating the event was that it should't cause any issue in case of errors/retries(like recursively adding a new span under activity span), but it doesn't seem anything like that happening for now, so no issues.