From ab1a4b4a4a68dfc6bb5cf5bd4acd5dc693be631f Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Tue, 16 Jul 2024 16:59:13 +0800 Subject: [PATCH] Add producer-consumer telemetry example to stress app --- .../Stress.ApiService/ProducerConsumer.cs | 69 +++++++++++++++++++ .../Stress/Stress.ApiService/Program.cs | 11 ++- 2 files changed, 79 insertions(+), 1 deletion(-) create mode 100644 playground/Stress/Stress.ApiService/ProducerConsumer.cs diff --git a/playground/Stress/Stress.ApiService/ProducerConsumer.cs b/playground/Stress/Stress.ApiService/ProducerConsumer.cs new file mode 100644 index 00000000000..5f581644360 --- /dev/null +++ b/playground/Stress/Stress.ApiService/ProducerConsumer.cs @@ -0,0 +1,69 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Diagnostics; +using System.Threading.Channels; + +namespace Stress.ApiService; + +public class Data +{ + public required int Id { get; init; } + public required Activity? Producer { get; init; } + public required string Message { get; init; } +} + +public class ProducerConsumer +{ + public const string ActivitySourceName = "ProducerConsumer"; + + private readonly Channel _channel = Channel.CreateUnbounded(); + + private static readonly ActivitySource s_activitySource = new(ActivitySourceName); + + public async Task ProduceAndConsumeAsync(int count) + { + var consumerTask = Task.Run(async () => + { + using var appActivity = s_activitySource.StartActivity("ConsumerApp", ActivityKind.Internal); + + await foreach (var item in _channel.Reader.ReadAllAsync()) + { + var links = new List(); + if (item.Producer != null) + { + links.Add(new ActivityLink(new ActivityContext(item.Producer.TraceId, item.Producer.SpanId, ActivityTraceFlags.None))); + } + + using var activity = s_activitySource.StartActivity($"Consume {item.Id}", ActivityKind.Consumer, parentId: null, links: links); + + await Task.Delay(Random.Shared.Next(10, 50)); + } + }); + + using var appActivity = s_activitySource.StartActivity("ProducerApp", ActivityKind.Internal); + + for (var i = 0; i < count; i++) + { + var id = i + 1; + + Data data; + using (var activity = s_activitySource.StartActivity($"Produce {id}", ActivityKind.Producer)) + { + await Task.Delay(Random.Shared.Next(10, 50)); + + data = new Data + { + Id = id, + Producer = activity, + Message = $"Message {id}" + }; + } + + await _channel.Writer.WriteAsync(data); + } + _channel.Writer.Complete(); + + await consumerTask; + } +} diff --git a/playground/Stress/Stress.ApiService/Program.cs b/playground/Stress/Stress.ApiService/Program.cs index a7e38cdc647..5aee2a033f3 100644 --- a/playground/Stress/Stress.ApiService/Program.cs +++ b/playground/Stress/Stress.ApiService/Program.cs @@ -11,7 +11,7 @@ builder.AddServiceDefaults(); builder.Services.AddOpenTelemetry() - .WithTracing(tracing => tracing.AddSource(TraceCreator.ActivitySourceName)); + .WithTracing(tracing => tracing.AddSource(TraceCreator.ActivitySourceName, ProducerConsumer.ActivitySourceName)); var app = builder.Build(); @@ -119,4 +119,13 @@ async IAsyncEnumerable WriteOutput() } }); +app.MapGet("/producer-consumer", async () => +{ + var producerConsumer = new ProducerConsumer(); + + await producerConsumer.ProduceAndConsumeAsync(count: 5); + + return "Produced and consumed"; +}); + app.Run();