From bae86e91c9215ff099be92636a0c6da1efefeb3b Mon Sep 17 00:00:00 2001 From: g7ed6e Date: Tue, 12 Dec 2023 17:15:23 +0100 Subject: [PATCH 1/9] Add ConfluentKafka Apply pr suggestions Apply pr suggestions Add missing new line in CHANGELOG.md Apply pr suggestions DisablePackageBaselineValidation and drop depebdebcy to System.Reflection.Emit.Lightweight Update sample to use OTLP exporter Update UT Update prepare-release.yml Update codecov.yml Remove duplicated activity stop call Remove parent/child relation between publish and receive spans Use static singletons Add ability to share instrumentation between TraceProviderBuilder and MeterProviderBuilder Update src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedProducer.cs Co-authored-by: Mikel Blanchard Update src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedConsumer.cs Co-authored-by: Mikel Blanchard Update src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedProducer.cs Co-authored-by: Mikel Blanchard Update src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedProducer.cs Co-authored-by: Mikel Blanchard Set Activity status to ERROR and set error.type tag fix Examples.Kafka README.md Run dotnet format on new code Update metering implementation to follow 1.24 --- .github/ISSUE_TEMPLATE/bug_report.yml | 1 + .github/ISSUE_TEMPLATE/feature_request.yml | 1 + .github/codecov.yml | 5 + .github/component_owners.yml | 4 + .github/workflows/ci.yml | 14 + .github/workflows/prepare-release.yml | 1 + build/Common.props | 1 + ...emetry.Instrumentation.ConfluentKafka.proj | 33 ++ examples/kafka/Constants.cs | 9 + examples/kafka/Examples.ConfluentKafka.csproj | 17 + examples/kafka/ProduceConsumeHostedService.cs | 53 +++ examples/kafka/Program.cs | 50 +++ examples/kafka/README.md | 14 + opentelemetry-dotnet-contrib.sln | 24 ++ .../.publicApi/PublicAPI.Shipped.txt | 2 + .../.publicApi/PublicAPI.Unshipped.txt | 20 + .../AssemblyInfo.cs | 18 + .../CHANGELOG.md | 5 + .../ConfluentKafkaCommon.cs | 14 + .../ConfluentKafkaConsumerInstrumentation.cs | 18 + ...uentKafkaConsumerInstrumentationOptions.cs | 11 + .../ConfluentKafkaProducerInstrumentation.cs | 18 + ...uentKafkaProducerInstrumentationOptions.cs | 11 + .../ConsumerMeterInstrumentation.cs | 36 ++ .../InstrumentedConsumer.cs | 395 ++++++++++++++++++ .../InstrumentedConsumerBuilder.cs | 46 ++ .../InstrumentedProducer.cs | 375 +++++++++++++++++ .../InstrumentedProducerBuilder.cs | 43 ++ ...MeterProviderBuilderExtensions.Consumer.cs | 90 ++++ ...MeterProviderBuilderExtensions.Producer.cs | 90 ++++ ...etry.Instrumentation.ConfluentKafka.csproj | 35 ++ .../ProducerMeterInstrumentation.cs | 36 ++ .../README.md | 11 + ...racerProviderBuilderExtensions.Consumer.cs | 90 ++++ ...racerProviderBuilderExtensions.Producer.cs | 90 ++++ src/Shared/SemanticConventions.cs | 20 + .../Dockerfile | 19 + .../HostedMeteringTests.cs | 113 +++++ .../HostedTracingAndMeteringTests.cs | 127 ++++++ .../HostedTracingTests.cs | 99 +++++ .../KafkaHelpers.cs | 32 ++ .../MeterProviderExtensions.cs | 19 + .../MeteringTests.cs | 139 ++++++ ...nstrumentation.ConfluentKafka.Tests.csproj | 28 ++ .../ProducerExtensions.cs | 19 + .../TracingTests.cs | 308 ++++++++++++++ .../docker-compose.yml | 24 ++ 47 files changed, 2628 insertions(+) create mode 100644 build/Projects/OpenTelemetry.Instrumentation.ConfluentKafka.proj create mode 100644 examples/kafka/Constants.cs create mode 100644 examples/kafka/Examples.ConfluentKafka.csproj create mode 100644 examples/kafka/ProduceConsumeHostedService.cs create mode 100644 examples/kafka/Program.cs create mode 100644 examples/kafka/README.md create mode 100644 src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/PublicAPI.Shipped.txt create mode 100644 src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/PublicAPI.Unshipped.txt create mode 100644 src/OpenTelemetry.Instrumentation.ConfluentKafka/AssemblyInfo.cs create mode 100644 src/OpenTelemetry.Instrumentation.ConfluentKafka/CHANGELOG.md create mode 100644 src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaCommon.cs create mode 100644 src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaConsumerInstrumentation.cs create mode 100644 src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaConsumerInstrumentationOptions.cs create mode 100644 src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaProducerInstrumentation.cs create mode 100644 src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaProducerInstrumentationOptions.cs create mode 100644 src/OpenTelemetry.Instrumentation.ConfluentKafka/ConsumerMeterInstrumentation.cs create mode 100644 src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedConsumer.cs create mode 100644 src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedConsumerBuilder.cs create mode 100644 src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedProducer.cs create mode 100644 src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedProducerBuilder.cs create mode 100644 src/OpenTelemetry.Instrumentation.ConfluentKafka/MeterProviderBuilderExtensions.Consumer.cs create mode 100644 src/OpenTelemetry.Instrumentation.ConfluentKafka/MeterProviderBuilderExtensions.Producer.cs create mode 100644 src/OpenTelemetry.Instrumentation.ConfluentKafka/OpenTelemetry.Instrumentation.ConfluentKafka.csproj create mode 100644 src/OpenTelemetry.Instrumentation.ConfluentKafka/ProducerMeterInstrumentation.cs create mode 100644 src/OpenTelemetry.Instrumentation.ConfluentKafka/README.md create mode 100644 src/OpenTelemetry.Instrumentation.ConfluentKafka/TracerProviderBuilderExtensions.Consumer.cs create mode 100644 src/OpenTelemetry.Instrumentation.ConfluentKafka/TracerProviderBuilderExtensions.Producer.cs create mode 100644 test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/Dockerfile create mode 100644 test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedMeteringTests.cs create mode 100644 test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedTracingAndMeteringTests.cs create mode 100644 test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedTracingTests.cs create mode 100644 test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/KafkaHelpers.cs create mode 100644 test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/MeterProviderExtensions.cs create mode 100644 test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/MeteringTests.cs create mode 100644 test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/OpenTelemetry.Instrumentation.ConfluentKafka.Tests.csproj create mode 100644 test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/ProducerExtensions.cs create mode 100644 test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/TracingTests.cs create mode 100644 test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/docker-compose.yml diff --git a/.github/ISSUE_TEMPLATE/bug_report.yml b/.github/ISSUE_TEMPLATE/bug_report.yml index 79cb5b92b7..26d7a172bf 100644 --- a/.github/ISSUE_TEMPLATE/bug_report.yml +++ b/.github/ISSUE_TEMPLATE/bug_report.yml @@ -30,6 +30,7 @@ body: - OpenTelemetry.Instrumentation.AWS - OpenTelemetry.Instrumentation.AWSLambda - OpenTelemetry.Instrumentation.Cassandra + - OpenTelemetry.Instrumentation.ConfluentKafka - OpenTelemetry.Instrumentation.ElasticsearchClient - OpenTelemetry.Instrumentation.EntityFrameworkCore - OpenTelemetry.Instrumentation.EventCounters diff --git a/.github/ISSUE_TEMPLATE/feature_request.yml b/.github/ISSUE_TEMPLATE/feature_request.yml index 11b6f1362d..e48fec3575 100644 --- a/.github/ISSUE_TEMPLATE/feature_request.yml +++ b/.github/ISSUE_TEMPLATE/feature_request.yml @@ -30,6 +30,7 @@ body: - OpenTelemetry.Instrumentation.AWS - OpenTelemetry.Instrumentation.AWSLambda - OpenTelemetry.Instrumentation.Cassandra + - OpenTelemetry.Instrumentation.ConfluentKafka - OpenTelemetry.Instrumentation.ElasticsearchClient - OpenTelemetry.Instrumentation.EntityFrameworkCore - OpenTelemetry.Instrumentation.EventCounters diff --git a/.github/codecov.yml b/.github/codecov.yml index 0d3bda059b..a8f7fd2412 100644 --- a/.github/codecov.yml +++ b/.github/codecov.yml @@ -63,6 +63,11 @@ flags: paths: - src/OpenTelemetry.Instrumentation.AspNetCore + unittests-Instrumentation.ConfluentKafka: + carryforward: true + paths: + - src/OpenTelemetry.Instrumentation.ConfluentKafka + unittests-Instrumentation.EventCounters: carryforward: true paths: diff --git a/.github/component_owners.yml b/.github/component_owners.yml index b7c252688e..d4a5aa9ca3 100644 --- a/.github/component_owners.yml +++ b/.github/component_owners.yml @@ -35,6 +35,8 @@ components: - ppittle src/OpenTelemetry.Instrumentation.Cassandra/: - xsoheilalizadeh + src/OpenTelemetry.Instrumentation.ConfluentKafka/: + - g7ed6e src/OpenTelemetry.Instrumentation.ElasticsearchClient/: - ejsmith src/OpenTelemetry.Instrumentation.EventCounters/: @@ -130,6 +132,8 @@ components: - ppittle test/OpenTelemetry.Instrumentation.Cassandra.Tests/: - xsoheilalizadeh + test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/: + - g7ed6e test/OpenTelemetry.Instrumentation.ElasticsearchClient.Tests/: - ejsmith test/OpenTelemetry.Instrumentation.EventCounters.Tests/: diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4ceea50c2e..dac9fe22d8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -36,6 +36,7 @@ jobs: instrumentation-aws: ['*/OpenTelemetry.Instrumentation.AWS/**', '*/OpenTelemetry.Instrumentation.AWS.Tests/**', '!**/*.md'] instrumentation-aws-lambda: ['*/OpenTelemetry.Instrumentation.AWSLambda/**', '*/OpenTelemetry.Instrumentation.AWSLambda.Tests/**', '!**/*.md'] instrumentation-cassandra: ['*/OpenTelemetry.Instrumentation.Cassandra*/**', '!**/*.md'] + instrumentation-confluentkafka: ['*/OpenTelemetry.Instrumentation.ConfluentKafka*/**', 'examples/kafka/**', '!**/*.md'] instrumentation-elasticsearchclient: ['*/OpenTelemetry.Instrumentation.ElasticsearchClient*/**', '!**/*.md'] instrumentation-entityframeworkcore: ['*/OpenTelemetry.Instrumentation.EntityFrameworkCore*/**', '!**/*.md'] instrumentation-eventcounters: ['*/OpenTelemetry.Instrumentation.EventCounters*/**', 'examples/event-counters/**', '!**/*.md'] @@ -222,6 +223,17 @@ jobs: project-name: Component[OpenTelemetry.Instrumentation.Cassandra] code-cov-name: Instrumentation.Cassandra + build-test-instrumentation-confluentkafka: + needs: detect-changes + if: | + contains(needs.detect-changes.outputs.changes, 'instrumentation-confluentkafka') + || contains(needs.detect-changes.outputs.changes, 'build') + || contains(needs.detect-changes.outputs.changes, 'shared') + uses: ./.github/workflows/Component.BuildTest.yml + with: + project-name: Component[OpenTelemetry.Instrumentation.ConfluentKafka] + code-cov-name: Instrumentation.ConfluentKafka + build-test-instrumentation-elasticsearchclient: needs: detect-changes if: | @@ -518,6 +530,7 @@ jobs: || contains(needs.detect-changes.outputs.changes, 'instrumentation-aspnetcore') || contains(needs.detect-changes.outputs.changes, 'instrumentation-aws') || contains(needs.detect-changes.outputs.changes, 'instrumentation-awslambda') + || contains(needs.detect-changes.outputs.changes, 'instrumentation-confluentkafka') || contains(needs.detect-changes.outputs.changes, 'instrumentation-eventcounters') || contains(needs.detect-changes.outputs.changes, 'instrumentation-grpcnetclient') || contains(needs.detect-changes.outputs.changes, 'instrumentation-http') @@ -556,6 +569,7 @@ jobs: build-test-instrumentation-aws, build-test-instrumentation-awslambda, build-test-instrumentation-cassandra, + build-test-instrumentation-confluentkafka, build-test-instrumentation-elasticsearchclient, build-test-instrumentation-entityframeworkcore, build-test-instrumentation-eventcounters, diff --git a/.github/workflows/prepare-release.yml b/.github/workflows/prepare-release.yml index eea3b61943..897135b2b7 100644 --- a/.github/workflows/prepare-release.yml +++ b/.github/workflows/prepare-release.yml @@ -20,6 +20,7 @@ on: - OpenTelemetry.Instrumentation.AWS - OpenTelemetry.Instrumentation.AWSLambda - OpenTelemetry.Instrumentation.Cassandra + - OpenTelemetry.Instrumentation.ConfluentKafka - OpenTelemetry.Instrumentation.ElasticsearchClient - OpenTelemetry.Instrumentation.EntityFrameworkCore - OpenTelemetry.Instrumentation.EventCounters diff --git a/build/Common.props b/build/Common.props index 9099eb0bff..6f0a9ae442 100644 --- a/build/Common.props +++ b/build/Common.props @@ -42,6 +42,7 @@ [1.9.0,2.0) [1.9.0-rc.1] [2.1.58,3.0) + [2.3.0,3.0) [3.16.0,4.0) [1.2.0-beta.507,2.0) [4.3.4,) diff --git a/build/Projects/OpenTelemetry.Instrumentation.ConfluentKafka.proj b/build/Projects/OpenTelemetry.Instrumentation.ConfluentKafka.proj new file mode 100644 index 0000000000..d0295e54da --- /dev/null +++ b/build/Projects/OpenTelemetry.Instrumentation.ConfluentKafka.proj @@ -0,0 +1,33 @@ + + + + $([System.IO.Directory]::GetParent($(MSBuildThisFileDirectory)).Parent.Parent.FullName) + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/examples/kafka/Constants.cs b/examples/kafka/Constants.cs new file mode 100644 index 0000000000..af3cf026c4 --- /dev/null +++ b/examples/kafka/Constants.cs @@ -0,0 +1,9 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +namespace Examples.ConfluentKafka; + +public static class Constants +{ + public static readonly string Topic = $"test-topic-{Guid.NewGuid()}"; +} diff --git a/examples/kafka/Examples.ConfluentKafka.csproj b/examples/kafka/Examples.ConfluentKafka.csproj new file mode 100644 index 0000000000..b9ce8848cb --- /dev/null +++ b/examples/kafka/Examples.ConfluentKafka.csproj @@ -0,0 +1,17 @@ + + + Exe + net8.0 + enable + enable + + + + + + + + + + + diff --git a/examples/kafka/ProduceConsumeHostedService.cs b/examples/kafka/ProduceConsumeHostedService.cs new file mode 100644 index 0000000000..04d14dd893 --- /dev/null +++ b/examples/kafka/ProduceConsumeHostedService.cs @@ -0,0 +1,53 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using Confluent.Kafka; +using OpenTelemetry.Instrumentation.ConfluentKafka; + +namespace Examples.ConfluentKafka; + +public class ProduceConsumeHostedService( + InstrumentedProducerBuilder instrumentedProducerBuilder, + InstrumentedConsumerBuilder instrumentedConsumerBuilder) + : BackgroundService +{ + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + IProducer producer = instrumentedProducerBuilder.Build(); + IConsumer consumer = instrumentedConsumerBuilder.Build(); + + for (int j = 0; j < 100; j++) + { + await producer.ProduceAsync( + Constants.Topic, + new Message { Key = "any_key", Value = $"any_value_{j}" }, + stoppingToken); + } + + for (int j = 0; j < 100; j++) + { + producer.Produce( + Constants.Topic, + new Message { Key = "any_key", Value = $"any_value_{j}" }); + } + + producer.Flush(stoppingToken); + + consumer.Subscribe(Constants.Topic); + while (!stoppingToken.IsCancellationRequested) + { + ConsumeResult consumeResult = consumer.Consume(stoppingToken); + if (consumeResult == null) + { + continue; + } + + if (consumeResult.IsPartitionEOF) + { + break; + } + + Console.WriteLine($"Consumer {consumer.Name} received message: {consumeResult.Message.Value}"); + } + } +} diff --git a/examples/kafka/Program.cs b/examples/kafka/Program.cs new file mode 100644 index 0000000000..fe9c6dbcdd --- /dev/null +++ b/examples/kafka/Program.cs @@ -0,0 +1,50 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using Confluent.Kafka; +using Examples.ConfluentKafka; +using OpenTelemetry.Instrumentation.ConfluentKafka; +using OpenTelemetry.Metrics; +using OpenTelemetry.Trace; + +var builder = Host.CreateApplicationBuilder(args); + +const string bootstrapServers = "localhost:9092"; + +builder.Services.AddSingleton(_ => +{ + ProducerConfig producerConfig = new() { BootstrapServers = bootstrapServers }; + return new InstrumentedProducerBuilder(producerConfig); +}); +builder.Services.AddSingleton(_ => +{ + ConsumerConfig consumerConfigA = new() + { + BootstrapServers = bootstrapServers, + GroupId = "group-a", + AutoOffsetReset = AutoOffsetReset.Earliest, + EnablePartitionEof = true, + }; + return new InstrumentedConsumerBuilder(consumerConfigA); +}); + +builder.Services.AddOpenTelemetry() + .WithTracing(tracing => + { + tracing.AddConsoleExporter() + .AddOtlpExporter() + .AddKafkaProducerInstrumentation() + .AddKafkaConsumerInstrumentation(); + }) + .WithMetrics(metering => + { + metering.AddConsoleExporter() + .AddOtlpExporter() + .AddKafkaProducerInstrumentation() + .AddKafkaConsumerInstrumentation(); + }); + +builder.Services.AddHostedService(); + +var app = builder.Build(); +await app.RunAsync(); diff --git a/examples/kafka/README.md b/examples/kafka/README.md new file mode 100644 index 0000000000..b4349e238c --- /dev/null +++ b/examples/kafka/README.md @@ -0,0 +1,14 @@ +# Run Examples.ConfluentKafka + +Start the Confluent Kafka stack: + +```cmd +docker run -d --name kafka -p 9092:9092 confluentinc/confluent-local +``` + +Start the Aspire Dashboard: + +```cmd +docker run --rm -it -p 18888:18888 -p 4317:18889 -d --name aspire-dashboard mcr.microsoft.com/dotnet/nightly/aspire-dashboard:8.0.0 +``` + diff --git a/opentelemetry-dotnet-contrib.sln b/opentelemetry-dotnet-contrib.sln index f0a4560077..65a1740bef 100644 --- a/opentelemetry-dotnet-contrib.sln +++ b/opentelemetry-dotnet-contrib.sln @@ -377,6 +377,14 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "ISSUE_TEMPLATE", "ISSUE_TEM .github\ISSUE_TEMPLATE\feature_request.yml = .github\ISSUE_TEMPLATE\feature_request.yml EndProjectSection EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "OpenTelemetry.Instrumentation.ConfluentKafka", "src\OpenTelemetry.Instrumentation.ConfluentKafka\OpenTelemetry.Instrumentation.ConfluentKafka.csproj", "{96341E23-990E-4144-A7E3-9EF0DAFF3232}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "OpenTelemetry.Instrumentation.ConfluentKafka.Tests", "test\OpenTelemetry.Instrumentation.ConfluentKafka.Tests\OpenTelemetry.Instrumentation.ConfluentKafka.Tests.csproj", "{BE40900A-2859-471D-8802-21DFD73DDAA7}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "kafka", "kafka", "{3A464E7A-42F3-44B0-B8D7-80521A7704A6}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Examples.ConfluentKafka", "examples\kafka\Examples.ConfluentKafka.csproj", "{9B994669-E839-4C42-A0F1-DF9DD058C1DC}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -779,6 +787,18 @@ Global {B13394D6-D3D7-453E-B91A-24C199F41C5E}.Debug|Any CPU.Build.0 = Debug|Any CPU {B13394D6-D3D7-453E-B91A-24C199F41C5E}.Release|Any CPU.ActiveCfg = Release|Any CPU {B13394D6-D3D7-453E-B91A-24C199F41C5E}.Release|Any CPU.Build.0 = Release|Any CPU + {96341E23-990E-4144-A7E3-9EF0DAFF3232}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {96341E23-990E-4144-A7E3-9EF0DAFF3232}.Debug|Any CPU.Build.0 = Debug|Any CPU + {96341E23-990E-4144-A7E3-9EF0DAFF3232}.Release|Any CPU.ActiveCfg = Release|Any CPU + {96341E23-990E-4144-A7E3-9EF0DAFF3232}.Release|Any CPU.Build.0 = Release|Any CPU + {BE40900A-2859-471D-8802-21DFD73DDAA7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {BE40900A-2859-471D-8802-21DFD73DDAA7}.Debug|Any CPU.Build.0 = Debug|Any CPU + {BE40900A-2859-471D-8802-21DFD73DDAA7}.Release|Any CPU.ActiveCfg = Release|Any CPU + {BE40900A-2859-471D-8802-21DFD73DDAA7}.Release|Any CPU.Build.0 = Release|Any CPU + {9B994669-E839-4C42-A0F1-DF9DD058C1DC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {9B994669-E839-4C42-A0F1-DF9DD058C1DC}.Debug|Any CPU.Build.0 = Debug|Any CPU + {9B994669-E839-4C42-A0F1-DF9DD058C1DC}.Release|Any CPU.ActiveCfg = Release|Any CPU + {9B994669-E839-4C42-A0F1-DF9DD058C1DC}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -899,6 +919,10 @@ Global {70CA77D4-5D7F-4D70-A6B5-8AAC07A8EA3C} = {2097345F-4DD3-477D-BC54-A922F9B2B402} {45D29DAA-0DB9-4808-B879-1AECC37EF366} = {824BD1DE-3FA8-4FE0-823A-FD365EAC78AF} {40373C78-0513-4067-A96B-96A851369761} = {1A06E14B-DD2F-4536-9D2E-F708C0C43555} + {96341E23-990E-4144-A7E3-9EF0DAFF3232} = {22DF5DC0-1290-4E83-A9D8-6BB7DE3B3E63} + {BE40900A-2859-471D-8802-21DFD73DDAA7} = {2097345F-4DD3-477D-BC54-A922F9B2B402} + {3A464E7A-42F3-44B0-B8D7-80521A7704A6} = {B75EE478-97F7-4E9F-9A5A-DB3D0988EDEA} + {9B994669-E839-4C42-A0F1-DF9DD058C1DC} = {3A464E7A-42F3-44B0-B8D7-80521A7704A6} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {B0816796-CDB3-47D7-8C3C-946434DE3B66} diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/PublicAPI.Shipped.txt b/src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/PublicAPI.Shipped.txt new file mode 100644 index 0000000000..074c6ad103 --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/PublicAPI.Shipped.txt @@ -0,0 +1,2 @@ +#nullable enable + diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/PublicAPI.Unshipped.txt b/src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/PublicAPI.Unshipped.txt new file mode 100644 index 0000000000..d27211c49c --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/.publicApi/PublicAPI.Unshipped.txt @@ -0,0 +1,20 @@ +OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedConsumerBuilder +OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedConsumerBuilder.InstrumentedConsumerBuilder(System.Collections.Generic.IEnumerable>! config) -> void +OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder +OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder.InstrumentedProducerBuilder(System.Collections.Generic.IEnumerable>! config) -> void +OpenTelemetry.Metrics.MeterProviderBuilderExtensions +OpenTelemetry.Trace.TracerProviderBuilderExtensions +override OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedConsumerBuilder.Build() -> Confluent.Kafka.IConsumer! +override OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder.Build() -> Confluent.Kafka.IProducer! +static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaConsumerInstrumentation(this OpenTelemetry.Metrics.MeterProviderBuilder! builder) -> OpenTelemetry.Metrics.MeterProviderBuilder! +static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaConsumerInstrumentation(this OpenTelemetry.Metrics.MeterProviderBuilder! builder, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedConsumerBuilder! consumerBuilder) -> OpenTelemetry.Metrics.MeterProviderBuilder! +static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaConsumerInstrumentation(this OpenTelemetry.Metrics.MeterProviderBuilder! builder, string? name, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedConsumerBuilder? consumerBuilder) -> OpenTelemetry.Metrics.MeterProviderBuilder! +static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaProducerInstrumentation(this OpenTelemetry.Metrics.MeterProviderBuilder! builder) -> OpenTelemetry.Metrics.MeterProviderBuilder! +static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaProducerInstrumentation(this OpenTelemetry.Metrics.MeterProviderBuilder! builder, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder! producerBuilder) -> OpenTelemetry.Metrics.MeterProviderBuilder! +static OpenTelemetry.Metrics.MeterProviderBuilderExtensions.AddKafkaProducerInstrumentation(this OpenTelemetry.Metrics.MeterProviderBuilder! builder, string? name, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder? producerBuilder) -> OpenTelemetry.Metrics.MeterProviderBuilder! +static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaConsumerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder! builder) -> OpenTelemetry.Trace.TracerProviderBuilder! +static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaConsumerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder! builder, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedConsumerBuilder! consumerBuilder) -> OpenTelemetry.Trace.TracerProviderBuilder! +static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaConsumerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder! builder, string? name, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedConsumerBuilder? consumerBuilder) -> OpenTelemetry.Trace.TracerProviderBuilder! +static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder! builder) -> OpenTelemetry.Trace.TracerProviderBuilder! +static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder! builder, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder! producerBuilder) -> OpenTelemetry.Trace.TracerProviderBuilder! +static OpenTelemetry.Trace.TracerProviderBuilderExtensions.AddKafkaProducerInstrumentation(this OpenTelemetry.Trace.TracerProviderBuilder! builder, string? name, OpenTelemetry.Instrumentation.ConfluentKafka.InstrumentedProducerBuilder? producerBuilder) -> OpenTelemetry.Trace.TracerProviderBuilder! diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/AssemblyInfo.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/AssemblyInfo.cs new file mode 100644 index 0000000000..9bb32bb7a6 --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/AssemblyInfo.cs @@ -0,0 +1,18 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using System.Runtime.CompilerServices; + +[assembly: InternalsVisibleTo("OpenTelemetry.Instrumentation.ConfluentKafka.Tests" + AssemblyInfo.PublicKey)] + +#if SIGNED +internal static class AssemblyInfo +{ + public const string PublicKey = ", PublicKey=002400000480000094000000060200000024000052534131000400000100010051C1562A090FB0C9F391012A32198B5E5D9A60E9B80FA2D7B434C9E5CCB7259BD606E66F9660676AFC6692B8CDC6793D190904551D2103B7B22FA636DCBB8208839785BA402EA08FC00C8F1500CCEF28BBF599AA64FFB1E1D5DC1BF3420A3777BADFE697856E9D52070A50C3EA5821C80BEF17CA3ACFFA28F89DD413F096F898"; +} +#else +internal static class AssemblyInfo +{ + public const string PublicKey = ""; +} +#endif diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/CHANGELOG.md b/src/OpenTelemetry.Instrumentation.ConfluentKafka/CHANGELOG.md new file mode 100644 index 0000000000..134621e04d --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/CHANGELOG.md @@ -0,0 +1,5 @@ +# Changelog + +## Unreleased + +* Initial release diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaCommon.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaCommon.cs new file mode 100644 index 0000000000..2a8b86016e --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaCommon.cs @@ -0,0 +1,14 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using System.Diagnostics; +using OpenTelemetry.Internal; + +namespace OpenTelemetry.Instrumentation.ConfluentKafka; + +internal static class ConfluentKafkaCommon +{ + internal static readonly string InstrumentationName = typeof(ConfluentKafkaCommon).Assembly.GetName().Name!; + internal static readonly string InstrumentationVersion = typeof(ConfluentKafkaCommon).Assembly.GetPackageVersion(); + internal static readonly ActivitySource ActivitySource = new(InstrumentationName, InstrumentationVersion); +} diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaConsumerInstrumentation.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaConsumerInstrumentation.cs new file mode 100644 index 0000000000..573b78d56b --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaConsumerInstrumentation.cs @@ -0,0 +1,18 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +namespace OpenTelemetry.Instrumentation.ConfluentKafka; + +internal class ConfluentKafkaConsumerInstrumentation; + +#pragma warning disable SA1402 // File may only contain a single type +internal sealed class ConfluentKafkaConsumerInstrumentation : ConfluentKafkaConsumerInstrumentation +#pragma warning restore SA1402 // File may only contain a single type +{ + public ConfluentKafkaConsumerInstrumentation(InstrumentedConsumerBuilder consumerBuilder) + { + this.ConsumerBuilder = consumerBuilder; + } + + internal InstrumentedConsumerBuilder ConsumerBuilder { get; } +} diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaConsumerInstrumentationOptions.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaConsumerInstrumentationOptions.cs new file mode 100644 index 0000000000..c2a108c100 --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaConsumerInstrumentationOptions.cs @@ -0,0 +1,11 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +namespace OpenTelemetry.Instrumentation.ConfluentKafka; + +internal class ConfluentKafkaConsumerInstrumentationOptions +{ + public bool Metrics { get; set; } + + public bool Traces { get; set; } +} diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaProducerInstrumentation.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaProducerInstrumentation.cs new file mode 100644 index 0000000000..13f71b1905 --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaProducerInstrumentation.cs @@ -0,0 +1,18 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +namespace OpenTelemetry.Instrumentation.ConfluentKafka; + +internal abstract class ConfluentKafkaProducerInstrumentation; + +#pragma warning disable SA1402 // File may only contain a single type +internal sealed class ConfluentKafkaProducerInstrumentation : ConfluentKafkaProducerInstrumentation +#pragma warning restore SA1402 // File may only contain a single type +{ + public ConfluentKafkaProducerInstrumentation(InstrumentedProducerBuilder producerBuilder) + { + this.ProducerBuilder = producerBuilder; + } + + internal InstrumentedProducerBuilder ProducerBuilder { get; } +} diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaProducerInstrumentationOptions.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaProducerInstrumentationOptions.cs new file mode 100644 index 0000000000..cb97821416 --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaProducerInstrumentationOptions.cs @@ -0,0 +1,11 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +namespace OpenTelemetry.Instrumentation.ConfluentKafka; + +internal class ConfluentKafkaProducerInstrumentationOptions +{ + public bool Metrics { get; set; } + + public bool Traces { get; set; } +} diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConsumerMeterInstrumentation.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConsumerMeterInstrumentation.cs new file mode 100644 index 0000000000..912fdde9f1 --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConsumerMeterInstrumentation.cs @@ -0,0 +1,36 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using System.Diagnostics.Metrics; +using OpenTelemetry.Trace; + +namespace OpenTelemetry.Instrumentation.ConfluentKafka; + +internal sealed class ConsumerMeterInstrumentation : IDisposable +{ + private readonly Meter meter; + private readonly Counter receiveMessagesCounter; + private readonly Histogram receiveDurationHistogram; + + public ConsumerMeterInstrumentation() + { + this.meter = new Meter(ConfluentKafkaCommon.InstrumentationName, ConfluentKafkaCommon.InstrumentationVersion); + this.receiveMessagesCounter = this.meter.CreateCounter(SemanticConventions.MetricMessagingReceiveMessages); + this.receiveDurationHistogram = this.meter.CreateHistogram(SemanticConventions.MetricMessagingReceiveDuration); + } + + public void RecordReceivedMessage(ReadOnlySpan> tags) + { + this.receiveMessagesCounter.Add(1, tags); + } + + public void RecordReceiveDuration(double duration, ReadOnlySpan> tags) + { + this.receiveDurationHistogram.Record(duration, tags); + } + + public void Dispose() + { + this.meter.Dispose(); + } +} diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedConsumer.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedConsumer.cs new file mode 100644 index 0000000000..d279d71c48 --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedConsumer.cs @@ -0,0 +1,395 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using System.Diagnostics; +using System.Text; +using Confluent.Kafka; +using OpenTelemetry.Context.Propagation; +using OpenTelemetry.Trace; + +namespace OpenTelemetry.Instrumentation.ConfluentKafka; + +internal class InstrumentedConsumer : IConsumer +{ + private const string ReceiveOperationName = "receive"; + private const string KafkaMessagingSystem = "kafka"; + private readonly ConsumerMeterInstrumentation consumerMeterInstrumentation = new(); + private readonly IConsumer consumer; + private readonly ConfluentKafkaConsumerInstrumentationOptions options; + + public InstrumentedConsumer(IConsumer consumer, ConfluentKafkaConsumerInstrumentationOptions options) + { + this.consumer = consumer; + this.options = options; + } + + public Handle Handle => this.consumer.Handle; + + public string Name => this.consumer.Name; + + public string MemberId => this.consumer.MemberId; + + public List Assignment => this.consumer.Assignment; + + public List Subscription => this.consumer.Subscription; + + public IConsumerGroupMetadata ConsumerGroupMetadata => this.consumer.ConsumerGroupMetadata; + + public string? GroupId { get; internal set; } + + public void Dispose() + { + this.consumerMeterInstrumentation.Dispose(); + this.consumer.Dispose(); + } + + public int AddBrokers(string brokers) + { + return this.consumer.AddBrokers(brokers); + } + + public void SetSaslCredentials(string username, string password) + { + this.consumer.SetSaslCredentials(username, password); + } + + public ConsumeResult? Consume(int millisecondsTimeout) + { + DateTimeOffset start = DateTimeOffset.UtcNow; + try + { + var result = this.consumer.Consume(millisecondsTimeout); + if (result is { IsPartitionEOF: false }) + { + this.InstrumentConsumption(start, result); + } + + return result; + } + catch (ConsumeException e) + { + this.InstrumentConsumptionError(start, e); + throw; + } + } + + public ConsumeResult? Consume(CancellationToken cancellationToken = default) + { + DateTimeOffset start = DateTimeOffset.UtcNow; + + try + { + var result = this.consumer.Consume(cancellationToken); + if (result is { IsPartitionEOF: false }) + { + this.InstrumentConsumption(start, result); + } + + return result; + } + catch (ConsumeException e) + { + this.InstrumentConsumptionError(start, e); + throw; + } + } + + public ConsumeResult? Consume(TimeSpan timeout) + { + DateTimeOffset start = DateTimeOffset.UtcNow; + + try + { + var result = this.consumer.Consume(timeout); + if (result is { IsPartitionEOF: false }) + { + this.InstrumentConsumption(start, result); + } + + return result; + } + catch (ConsumeException e) + { + this.InstrumentConsumptionError(start, e); + throw; + } + } + + public void Subscribe(IEnumerable topics) + { + this.consumer.Subscribe(topics); + } + + public void Subscribe(string topic) + { + this.consumer.Subscribe(topic); + } + + public void Unsubscribe() + { + this.consumer.Unsubscribe(); + } + + public void Assign(TopicPartition partition) + { + this.consumer.Assign(partition); + } + + public void Assign(TopicPartitionOffset partition) + { + this.consumer.Assign(partition); + } + + public void Assign(IEnumerable partitions) + { + this.consumer.Assign(partitions); + } + + public void Assign(IEnumerable partitions) + { + this.consumer.Assign(partitions); + } + + public void IncrementalAssign(IEnumerable partitions) + { + this.consumer.IncrementalAssign(partitions); + } + + public void IncrementalAssign(IEnumerable partitions) + { + this.consumer.IncrementalAssign(partitions); + } + + public void IncrementalUnassign(IEnumerable partitions) + { + this.consumer.IncrementalUnassign(partitions); + } + + public void Unassign() + { + this.consumer.Unassign(); + } + + public void StoreOffset(ConsumeResult result) + { + this.consumer.StoreOffset(result); + } + + public void StoreOffset(TopicPartitionOffset offset) + { + this.consumer.StoreOffset(offset); + } + + public List Commit() + { + return this.consumer.Commit(); + } + + public void Commit(IEnumerable offsets) + { + this.consumer.Commit(offsets); + } + + public void Commit(ConsumeResult result) + { + this.consumer.Commit(result); + } + + public void Seek(TopicPartitionOffset tpo) + { + this.consumer.Seek(tpo); + } + + public void Pause(IEnumerable partitions) + { + this.consumer.Pause(partitions); + } + + public void Resume(IEnumerable partitions) + { + this.consumer.Resume(partitions); + } + + public List Committed(TimeSpan timeout) + { + return this.consumer.Committed(timeout); + } + + public List Committed(IEnumerable partitions, TimeSpan timeout) + { + return this.consumer.Committed(partitions, timeout); + } + + public Offset Position(TopicPartition partition) + { + return this.consumer.Position(partition); + } + + public List OffsetsForTimes(IEnumerable timestampsToSearch, TimeSpan timeout) + { + return this.consumer.OffsetsForTimes(timestampsToSearch, timeout); + } + + public WatermarkOffsets GetWatermarkOffsets(TopicPartition topicPartition) + { + return this.consumer.GetWatermarkOffsets(topicPartition); + } + + public WatermarkOffsets QueryWatermarkOffsets(TopicPartition topicPartition, TimeSpan timeout) + { + return this.consumer.QueryWatermarkOffsets(topicPartition, timeout); + } + + public void Close() + { + this.consumer.Close(); + } + + private static string FormatConsumeException(ConsumeException consumeException) => + $"ConsumeException: {consumeException.Error}"; + + private static PropagationContext ExtractPropagationContext(Headers? headers) + => Propagators.DefaultTextMapPropagator.Extract(default, headers, ExtractTraceContext); + + private static IEnumerable ExtractTraceContext(Headers? headers, string value) + { + if (headers?.TryGetLastBytes(value, out var bytes) == true) + { + yield return Encoding.UTF8.GetString(bytes); + } + } + + private static IEnumerable> GetTags(string topic, int? partition = null, string? errorType = null) + { + yield return new KeyValuePair( + SemanticConventions.AttributeMessagingOperation, + ReceiveOperationName); + yield return new KeyValuePair( + SemanticConventions.AttributeMessagingSystem, + KafkaMessagingSystem); + yield return new KeyValuePair( + SemanticConventions.AttributeMessagingDestinationName, + topic); + if (partition is not null) + { + yield return new KeyValuePair( + SemanticConventions.AttributeMessagingKafkaDestinationPartition, + partition); + } + + if (errorType is not null) + { + yield return new KeyValuePair( + SemanticConventions.AttributeErrorType, + errorType); + } + } + + private void RecordReceive(TopicPartition topicPartition, TimeSpan duration, string? errorType = null) + { + var tags = GetTags(topicPartition.Topic, partition: topicPartition.Partition, errorType).ToArray(); + this.consumerMeterInstrumentation.RecordReceivedMessage(tags); + this.consumerMeterInstrumentation.RecordReceiveDuration(duration.TotalSeconds, tags); + } + + private void InstrumentConsumption(DateTimeOffset startTime, ConsumeResult result) + { + Activity? activity = null; + if (this.options.Traces) + { + ConsumeResult consumeResult = new(result.TopicPartitionOffset, result.Message.Headers, result.Message.Key); + PropagationContext propagationContext = consumeResult.Headers != null + ? ExtractPropagationContext(consumeResult.Headers) + : default; + var spanName = string.IsNullOrEmpty(consumeResult.TopicPartitionOffset?.Topic) + ? ReceiveOperationName + : string.Concat(consumeResult.TopicPartitionOffset!.Topic, " ", ReceiveOperationName); + + activity = this.StartReceiveActivity(spanName, propagationContext, startTime, consumeResult.TopicPartitionOffset, consumeResult.Key); + activity?.Stop(); + } + + if (this.options.Metrics) + { + TimeSpan duration = activity?.Duration ?? DateTimeOffset.UtcNow - startTime; + this.RecordReceive(result.TopicPartition, duration); + } + + activity?.Dispose(); + } + + private void InstrumentConsumptionError(DateTimeOffset startTime, ConsumeException exception) + { + Activity? activity = null; + string? errorType = null; + if (this.options.Traces) + { + ConsumeResult consumeResult = new(exception.ConsumerRecord.TopicPartitionOffset, exception.ConsumerRecord.Message.Headers, exception.ConsumerRecord.Message.Key); + PropagationContext propagationContext = consumeResult.Headers != null + ? ExtractPropagationContext(consumeResult.Headers) + : default; + var spanName = string.IsNullOrEmpty(consumeResult.TopicPartitionOffset?.Topic) + ? ReceiveOperationName + : string.Concat(consumeResult.TopicPartitionOffset!.Topic, " ", ReceiveOperationName); + + activity = this.StartReceiveActivity(spanName, propagationContext, startTime, consumeResult.TopicPartitionOffset, consumeResult.Key); + errorType = FormatConsumeException(exception); + if (activity != null) + { + activity.SetStatus(ActivityStatusCode.Error); + if (activity.IsAllDataRequested) + { + activity.SetTag(SemanticConventions.AttributeErrorType, errorType); + } + + activity.Stop(); + } + } + + if (this.options.Metrics) + { + TimeSpan duration = activity?.Duration ?? DateTimeOffset.UtcNow - startTime; + errorType ??= FormatConsumeException(exception); + this.RecordReceive(exception.ConsumerRecord.TopicPartition, duration, errorType); + } + + activity?.Dispose(); + } + + private Activity? StartReceiveActivity(string spanName, PropagationContext propagationContext, DateTimeOffset start, TopicPartitionOffset? topicPartitionOffset, object? key) + { + ActivityLink[] activityLinks = propagationContext.ActivityContext.IsValid() + ? new[] { new ActivityLink(propagationContext.ActivityContext) } + : Array.Empty(); + + Activity? activity = ConfluentKafkaCommon.ActivitySource.StartActivity(spanName, kind: ActivityKind.Consumer, links: activityLinks, startTime: start, parentContext: default); + if (activity?.IsAllDataRequested == true) + { + activity.SetTag(SemanticConventions.AttributeMessagingSystem, KafkaMessagingSystem); + activity.SetTag(SemanticConventions.AttributeMessagingClientId, this.Name); + activity.SetTag(SemanticConventions.AttributeMessagingDestinationName, topicPartitionOffset?.Topic); + activity.SetTag(SemanticConventions.AttributeMessagingKafkaDestinationPartition, topicPartitionOffset?.Partition.Value); + activity.SetTag(SemanticConventions.AttributeMessagingKafkaMessageOffset, topicPartitionOffset?.Offset.Value); + activity.SetTag(SemanticConventions.AttributeMessagingKafkaConsumerGroup, this.GroupId); + activity.SetTag(SemanticConventions.AttributeMessagingOperation, ReceiveOperationName); + if (key != null) + { + activity.SetTag(SemanticConventions.AttributeMessagingKafkaMessageKey, key); + } + } + + return activity; + } + + private readonly record struct ConsumeResult( + TopicPartitionOffset? TopicPartitionOffset, + Headers? Headers, + object? Key = null) + { + public object? Key { get; } = Key; + + public Headers? Headers { get; } = Headers; + + public TopicPartitionOffset? TopicPartitionOffset { get; } = TopicPartitionOffset; + } +} diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedConsumerBuilder.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedConsumerBuilder.cs new file mode 100644 index 0000000000..93268ba5c7 --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedConsumerBuilder.cs @@ -0,0 +1,46 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using System.Diagnostics; +using Confluent.Kafka; + +namespace OpenTelemetry.Instrumentation.ConfluentKafka; + +/// +/// A builder of with support for instrumentation. +/// +/// Type of the key. +/// Type of value. +public sealed class InstrumentedConsumerBuilder : ConsumerBuilder +{ + /// + /// Initializes a new instance of the class. + /// + /// A collection of librdkafka configuration parameters (refer to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) and parameters specific to this client (refer to: ). At a minimum, 'bootstrap.servers' must be specified. + public InstrumentedConsumerBuilder(IEnumerable> config) + : base(config) + { + } + + internal ConfluentKafkaConsumerInstrumentationOptions? Options { get; set; } + + /// + /// Build a new IConsumer instance. + /// + /// an . + public override IConsumer Build() + { + Debug.Assert(this.Options != null, "Options should not be null."); + + ConsumerConfig config = (ConsumerConfig)this.Config; + if (this.Options!.Metrics) + { + config.StatisticsIntervalMs ??= 1000; + } + + var consumer = new InstrumentedConsumer(base.Build(), this.Options); + consumer.GroupId = config.GroupId; + + return consumer; + } +} diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedProducer.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedProducer.cs new file mode 100644 index 0000000000..31d238df61 --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedProducer.cs @@ -0,0 +1,375 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using System.Diagnostics; +using System.Text; +using Confluent.Kafka; +using OpenTelemetry.Context.Propagation; +using OpenTelemetry.Trace; + +namespace OpenTelemetry.Instrumentation.ConfluentKafka; + +internal sealed class InstrumentedProducer : IProducer +{ + private const string PublishOperationName = "publish"; + private const string KafkaMessagingSystem = "kafka"; + + private readonly TextMapPropagator propagator = Propagators.DefaultTextMapPropagator; + private readonly ProducerMeterInstrumentation producerMeterInstrumentation = new(); + private readonly IProducer producer; + private readonly ConfluentKafkaProducerInstrumentationOptions options; + + public InstrumentedProducer( + IProducer producer, + ConfluentKafkaProducerInstrumentationOptions options) + { + this.producer = producer; + this.options = options; + } + + public Handle Handle => this.producer.Handle; + + public string Name => this.producer.Name; + + internal ConfluentKafkaProducerInstrumentationOptions Options => this.options; + + public int AddBrokers(string brokers) + { + return this.producer.AddBrokers(brokers); + } + + public void SetSaslCredentials(string username, string password) + { + this.producer.SetSaslCredentials(username, password); + } + + public async Task> ProduceAsync( + string topic, + Message message, + CancellationToken cancellationToken = default) + { + DateTimeOffset start = DateTimeOffset.UtcNow; + using Activity? activity = this.StartActivity(PublishOperationName, topic, message); + if (activity != null) + { + this.InjectActivity(activity, message); + } + + DeliveryResult result; + string? errorType = null; + try + { + result = await this.producer.ProduceAsync(topic, message, cancellationToken).ConfigureAwait(false); + } + catch (ProduceException produceException) + { + activity?.SetStatus(ActivityStatusCode.Error); + activity?.SetTag(SemanticConventions.AttributeErrorType, errorType = FormatProduceException(produceException)); + + throw; + } + catch (ArgumentException argumentException) + { + activity?.SetStatus(ActivityStatusCode.Error); + activity?.SetTag(SemanticConventions.AttributeErrorType, errorType = FormatArgumentException(argumentException)); + + throw; + } + finally + { + TimeSpan duration = activity?.Duration ?? DateTimeOffset.UtcNow - start; + activity?.Stop(); + + if (this.options.Metrics) + { + this.RecordPublish(topic, duration, errorType); + } + } + + return result; + } + + public async Task> ProduceAsync( + TopicPartition topicPartition, + Message message, + CancellationToken cancellationToken = default) + { + DateTimeOffset start = DateTimeOffset.UtcNow; + using Activity? activity = this.StartActivity(topicPartition, message); + if (activity != null) + { + this.InjectActivity(activity, message); + } + + DeliveryResult result; + string? errorType = null; + try + { + result = await this.producer.ProduceAsync(topicPartition, message, cancellationToken).ConfigureAwait(false); + } + catch (ProduceException produceException) + { + activity?.SetStatus(ActivityStatusCode.Error); + activity?.SetTag(SemanticConventions.AttributeErrorType, errorType = FormatProduceException(produceException)); + + throw; + } + catch (ArgumentException argumentException) + { + activity?.SetStatus(ActivityStatusCode.Error); + activity?.SetTag(SemanticConventions.AttributeErrorType, errorType = FormatArgumentException(argumentException)); + + throw; + } + finally + { + activity?.Stop(); + TimeSpan duration = activity?.Duration ?? DateTimeOffset.UtcNow - start; + + if (this.options.Metrics) + { + this.RecordPublish(topicPartition, duration, errorType); + } + } + + return result; + } + + public void Produce(string topic, Message message, Action>? deliveryHandler = null) + { + DateTimeOffset start = DateTimeOffset.UtcNow; + using Activity? activity = this.StartActivity(PublishOperationName, topic, message); + if (activity != null) + { + this.InjectActivity(activity, message); + } + + string? errorType = null; + try + { + this.producer.Produce(topic, message, deliveryHandler); + } + catch (ProduceException produceException) + { + activity?.SetStatus(ActivityStatusCode.Error); + activity?.SetTag(SemanticConventions.AttributeErrorType, errorType = FormatProduceException(produceException)); + + throw; + } + catch (ArgumentException argumentException) + { + activity?.SetStatus(ActivityStatusCode.Error); + activity?.SetTag(SemanticConventions.AttributeErrorType, errorType = FormatArgumentException(argumentException)); + + throw; + } + finally + { + activity?.Stop(); + TimeSpan duration = activity?.Duration ?? DateTimeOffset.UtcNow - start; + + if (this.options.Metrics) + { + this.RecordPublish(topic, duration, errorType); + } + } + } + + public void Produce(TopicPartition topicPartition, Message message, Action>? deliveryHandler = null) + { + DateTimeOffset start = DateTimeOffset.UtcNow; + using Activity? activity = this.StartActivity(topicPartition, message); + if (activity != null) + { + this.InjectActivity(activity, message); + } + + string? errorType = null; + try + { + this.producer.Produce(topicPartition, message, deliveryHandler); + } + catch (ProduceException produceException) + { + activity?.SetStatus(ActivityStatusCode.Error); + activity?.SetTag(SemanticConventions.AttributeErrorType, errorType = FormatProduceException(produceException)); + + throw; + } + catch (ArgumentException argumentException) + { + activity?.SetStatus(ActivityStatusCode.Error); + activity?.SetTag(SemanticConventions.AttributeErrorType, errorType = FormatArgumentException(argumentException)); + + throw; + } + finally + { + activity?.Stop(); + TimeSpan duration = activity?.Duration ?? DateTimeOffset.UtcNow - start; + + if (this.options.Metrics) + { + this.RecordPublish(topicPartition, duration, errorType); + } + } + } + + public int Poll(TimeSpan timeout) + { + return this.producer.Poll(timeout); + } + + public int Flush(TimeSpan timeout) + { + return this.producer.Flush(timeout); + } + + public void Flush(CancellationToken cancellationToken = default) + { + this.producer.Flush(cancellationToken); + } + + public void InitTransactions(TimeSpan timeout) + { + this.producer.InitTransactions(timeout); + } + + public void BeginTransaction() + { + this.producer.BeginTransaction(); + } + + public void CommitTransaction(TimeSpan timeout) + { + this.producer.CommitTransaction(timeout); + } + + public void CommitTransaction() + { + this.producer.CommitTransaction(); + } + + public void AbortTransaction(TimeSpan timeout) + { + this.producer.AbortTransaction(timeout); + } + + public void AbortTransaction() + { + this.producer.AbortTransaction(); + } + + public void SendOffsetsToTransaction(IEnumerable offsets, IConsumerGroupMetadata groupMetadata, TimeSpan timeout) + { + this.producer.SendOffsetsToTransaction(offsets, groupMetadata, timeout); + } + + public void Dispose() + { + this.producerMeterInstrumentation.Dispose(); + this.producer.Dispose(); + } + + private static string FormatProduceException(ProduceException produceException) => + $"ProduceException: {produceException.Error.Code}"; + + private static string FormatArgumentException(ArgumentException argumentException) => + $"ArgumentException: {argumentException.ParamName}"; + + private static IEnumerable> GetTags(string topic, int? partition = null, string? errorType = null) + { + yield return new KeyValuePair( + SemanticConventions.AttributeMessagingOperation, + PublishOperationName); + yield return new KeyValuePair( + SemanticConventions.AttributeMessagingSystem, + KafkaMessagingSystem); + yield return new KeyValuePair( + SemanticConventions.AttributeMessagingDestinationName, + topic); + if (partition is not null) + { + yield return new KeyValuePair( + SemanticConventions.AttributeMessagingKafkaDestinationPartition, + partition); + } + + if (errorType is not null) + { + yield return new KeyValuePair( + SemanticConventions.AttributeErrorType, + errorType); + } + } + + private void RecordPublish(string topic, TimeSpan duration, string? errorType = null) + { + var tags = GetTags(topic, partition: null, errorType).ToArray(); + this.producerMeterInstrumentation.RecordPublishMessage(tags); + this.producerMeterInstrumentation.RecordPublishDuration(duration.TotalSeconds, tags); + } + + private void RecordPublish(TopicPartition topicPartition, TimeSpan duration, string? errorType = null) + { + var tags = GetTags(topicPartition.Topic, partition: topicPartition.Partition, errorType).ToArray(); + this.producerMeterInstrumentation.RecordPublishMessage(tags); + this.producerMeterInstrumentation.RecordPublishDuration(duration.TotalSeconds, tags); + } + + private Activity? StartActivity(string operation, string topic, Message message) + { + var activity = ConfluentKafkaCommon.ActivitySource.StartActivity(string.Concat(topic, " ", operation), ActivityKind.Producer); + if (activity == null) + { + return null; + } + + if (activity.IsAllDataRequested) + { + activity.SetTag(SemanticConventions.AttributeMessagingSystem, KafkaMessagingSystem); + activity.SetTag(SemanticConventions.AttributeMessagingClientId, this.Name); + activity.SetTag(SemanticConventions.AttributeMessagingDestinationName, topic); + activity.SetTag(SemanticConventions.AttributeMessagingOperation, operation); + + if (message.Key != null) + { + activity.SetTag(SemanticConventions.AttributeMessagingKafkaMessageKey, message.Key); + } + } + + return activity; + } + + private Activity? StartActivity(TopicPartition topicPartition, Message message) + { + if (!this.options.Traces) + { + return null; + } + + var activity = this.StartActivity(PublishOperationName, topicPartition.Topic, message); + if (activity == null) + { + return null; + } + + if (activity.IsAllDataRequested) + { + activity.SetTag(SemanticConventions.AttributeMessagingKafkaDestinationPartition, topicPartition.Partition.Value); + } + + return activity; + } + + private void InjectActivity(Activity? activity, Message message) + { + this.propagator.Inject(new PropagationContext(activity?.Context ?? default, Baggage.Current), message, this.InjectTraceContext); + } + + private void InjectTraceContext(Message message, string key, string value) + { + message.Headers ??= new Headers(); + message.Headers.Add(key, Encoding.UTF8.GetBytes(value)); + } +} diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedProducerBuilder.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedProducerBuilder.cs new file mode 100644 index 0000000000..bcc29f5add --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedProducerBuilder.cs @@ -0,0 +1,43 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using System.Diagnostics; +using Confluent.Kafka; + +namespace OpenTelemetry.Instrumentation.ConfluentKafka; + +/// +/// A builder of with support for instrumentation. +/// +/// Type of the key. +/// Type of value. +public sealed class InstrumentedProducerBuilder : ProducerBuilder +{ + /// + /// Initializes a new instance of the class. + /// + /// A collection of librdkafka configuration parameters (refer to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) and parameters specific to this client (refer to: ). At a minimum, 'bootstrap.servers' must be specified. + public InstrumentedProducerBuilder(IEnumerable> config) + : base(config) + { + } + + internal ConfluentKafkaProducerInstrumentationOptions? Options { get; set; } + + /// + /// Build a new IProducer instance. + /// + /// an . + public override IProducer Build() + { + Debug.Assert(this.Options != null, "Options should not be null."); + + ProducerConfig config = (ProducerConfig)this.Config; + if (this.Options!.Metrics) + { + config.StatisticsIntervalMs ??= 1000; + } + + return new InstrumentedProducer(base.Build(), this.Options); + } +} diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/MeterProviderBuilderExtensions.Consumer.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/MeterProviderBuilderExtensions.Consumer.cs new file mode 100644 index 0000000000..0cc6b91997 --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/MeterProviderBuilderExtensions.Consumer.cs @@ -0,0 +1,90 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; +using OpenTelemetry.Instrumentation.ConfluentKafka; +using OpenTelemetry.Internal; + +namespace OpenTelemetry.Metrics; + +/// +/// Extension methods to simplify registering of Kafka instrumentation. +/// +public static partial class MeterProviderBuilderExtensions +{ + /// + /// Enables automatic data collection of outgoing requests to Kafka. + /// + /// The type of the key. + /// The type of the value. + /// being configured. + /// The instance of to chain the calls. + public static MeterProviderBuilder AddKafkaConsumerInstrumentation( + this MeterProviderBuilder builder) + => AddKafkaConsumerInstrumentation(builder, name: null, consumerBuilder: null); + + /// + /// Enables automatic data collection of outgoing requests to Kafka. + /// + /// The type of the key. + /// The type of the value. + /// being configured. + /// to instrument. + /// The instance of to chain the calls. + public static MeterProviderBuilder AddKafkaConsumerInstrumentation( + this MeterProviderBuilder builder, + InstrumentedConsumerBuilder consumerBuilder) + { + Guard.ThrowIfNull(consumerBuilder); + + return AddKafkaConsumerInstrumentation(builder, name: null, consumerBuilder); + } + + /// + /// Enables the incoming requests automatic data collection for ASP.NET. + /// + /// The type of the key. + /// The type of the value. + /// being configured. + /// The name of the instrumentation. + /// to instrument. + /// The instance of to chain the calls. + public static MeterProviderBuilder AddKafkaConsumerInstrumentation( + this MeterProviderBuilder builder, + string? name, + InstrumentedConsumerBuilder? consumerBuilder) + { + Guard.ThrowIfNull(builder); + + name ??= Options.DefaultName; + + builder.ConfigureServices(services => + { + services.Configure>(name, EnableMetrics); + }); + + return builder + .AddMeter(ConfluentKafkaCommon.InstrumentationName) + .AddInstrumentation(sp => + { + if (consumerBuilder == null) + { + consumerBuilder = sp.GetRequiredService>(); + var options = sp.GetRequiredService>>(); + consumerBuilder.Options = options.Get(name); + } + + if (consumerBuilder.Options == null) + { + consumerBuilder.Options = new ConfluentKafkaConsumerInstrumentationOptions(); + EnableMetrics(consumerBuilder.Options); + } + + return new ConfluentKafkaConsumerInstrumentation(consumerBuilder); + }); + } + + private static void EnableMetrics(ConfluentKafkaConsumerInstrumentationOptions options) => + options.Metrics = true; +} diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/MeterProviderBuilderExtensions.Producer.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/MeterProviderBuilderExtensions.Producer.cs new file mode 100644 index 0000000000..3e4c4021db --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/MeterProviderBuilderExtensions.Producer.cs @@ -0,0 +1,90 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; +using OpenTelemetry.Instrumentation.ConfluentKafka; +using OpenTelemetry.Internal; + +namespace OpenTelemetry.Metrics; + +/// +/// Extension methods to simplify registering of Kafka instrumentation. +/// +public static partial class MeterProviderBuilderExtensions +{ + /// + /// Enables automatic data collection of outgoing requests to Kafka. + /// + /// The type of the key. + /// The type of the value. + /// being configured. + /// The instance of to chain the calls. + public static MeterProviderBuilder AddKafkaProducerInstrumentation( + this MeterProviderBuilder builder) + => AddKafkaProducerInstrumentation(builder, name: null, producerBuilder: null); + + /// + /// Enables automatic data collection of outgoing requests to Kafka. + /// + /// The type of the key. + /// The type of the value. + /// being configured. + /// to instrument. + /// The instance of to chain the calls. + public static MeterProviderBuilder AddKafkaProducerInstrumentation( + this MeterProviderBuilder builder, + InstrumentedProducerBuilder producerBuilder) + { + Guard.ThrowIfNull(producerBuilder); + + return AddKafkaProducerInstrumentation(builder, name: null, producerBuilder); + } + + /// + /// Enables the incoming requests automatic data collection for ASP.NET. + /// + /// The type of the key. + /// The type of the value. + /// being configured. + /// The name of the instrumentation. + /// to instrument. + /// The instance of to chain the calls. + public static MeterProviderBuilder AddKafkaProducerInstrumentation( + this MeterProviderBuilder builder, + string? name, + InstrumentedProducerBuilder? producerBuilder) + { + Guard.ThrowIfNull(builder); + + name ??= Options.DefaultName; + + builder.ConfigureServices(services => + { + services.Configure>(name, EnableMetrics); + }); + + return builder + .AddMeter(ConfluentKafkaCommon.InstrumentationName) + .AddInstrumentation(sp => + { + if (producerBuilder == null) + { + producerBuilder = sp.GetRequiredService>(); + var options = sp.GetRequiredService>>(); + producerBuilder.Options = options.Get(name); + } + + if (producerBuilder.Options == null) + { + producerBuilder.Options = new ConfluentKafkaProducerInstrumentationOptions(); + EnableMetrics(producerBuilder.Options); + } + + return new ConfluentKafkaProducerInstrumentation(producerBuilder); + }); + } + + private static void EnableMetrics(ConfluentKafkaProducerInstrumentationOptions options) => + options.Metrics = true; +} diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/OpenTelemetry.Instrumentation.ConfluentKafka.csproj b/src/OpenTelemetry.Instrumentation.ConfluentKafka/OpenTelemetry.Instrumentation.ConfluentKafka.csproj new file mode 100644 index 0000000000..0d97d693a4 --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/OpenTelemetry.Instrumentation.ConfluentKafka.csproj @@ -0,0 +1,35 @@ + + + + net8.0;net6.0;net462 + Confluent.Kafka instrumentation for OpenTelemetry .NET + $(PackageTags);distributed-tracing;Kafka;Confluent.Kafka + true + Instrumentation.ConfluentKafka- + true + + + + + true + + + + + + + + + + + + + + + + + + + + diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/ProducerMeterInstrumentation.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ProducerMeterInstrumentation.cs new file mode 100644 index 0000000000..4288b92fa2 --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ProducerMeterInstrumentation.cs @@ -0,0 +1,36 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using System.Diagnostics.Metrics; +using OpenTelemetry.Trace; + +namespace OpenTelemetry.Instrumentation.ConfluentKafka; + +internal sealed class ProducerMeterInstrumentation : IDisposable +{ + private readonly Meter meter; + private readonly Counter publishMessagesCounter; + private readonly Histogram publishDurationHistogram; + + public ProducerMeterInstrumentation() + { + this.meter = new Meter(ConfluentKafkaCommon.InstrumentationName, ConfluentKafkaCommon.InstrumentationVersion); + this.publishMessagesCounter = this.meter.CreateCounter(SemanticConventions.MetricMessagingPublishMessages); + this.publishDurationHistogram = this.meter.CreateHistogram(SemanticConventions.MetricMessagingPublishDuration); + } + + public void RecordPublishMessage(ReadOnlySpan> tags) + { + this.publishMessagesCounter.Add(1, tags); + } + + public void RecordPublishDuration(double duration, ReadOnlySpan> tags) + { + this.publishDurationHistogram.Record(duration, tags); + } + + public void Dispose() + { + this.meter.Dispose(); + } +} diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/README.md b/src/OpenTelemetry.Instrumentation.ConfluentKafka/README.md new file mode 100644 index 0000000000..546e654eec --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/README.md @@ -0,0 +1,11 @@ +# Confluent.Kafka client instrumentation for OpenTelemetry + +[![NuGet version badge](https://img.shields.io/nuget/v/OpenTelemetry.Instrumentation.ConfluentKafka)](https://www.nuget.org/packages/OpenTelemetry.Instrumentation.ConfluentKafka) +[![NuGet download count badge](https://img.shields.io/nuget/dt/OpenTelemetry.Instrumentation.ConfluentKafka)](https://www.nuget.org/packages/OpenTelemetry.Instrumentation.ConfluentKafka) +[![codecov.io](https://codecov.io/gh/open-telemetry/opentelemetry-dotnet-contrib/branch/main/graphs/badge.svg?flag=unittests-Instrumentation.ConfluentKafka)](https://app.codecov.io/gh/open-telemetry/opentelemetry-dotnet-contrib?flags[0]=unittests-Instrumentation.ConfluentKafka) + +Download the `OpenTelemetry.Instrumentation.ConfluentKafka` package: + +```shell +dotnet add package OpenTelemetry.Instrumentation.ConfluentKafka --prerelease +``` diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/TracerProviderBuilderExtensions.Consumer.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/TracerProviderBuilderExtensions.Consumer.cs new file mode 100644 index 0000000000..d9eada36e1 --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/TracerProviderBuilderExtensions.Consumer.cs @@ -0,0 +1,90 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; +using OpenTelemetry.Instrumentation.ConfluentKafka; +using OpenTelemetry.Internal; + +namespace OpenTelemetry.Trace; + +/// +/// Extension methods to simplify registering of dependency instrumentation. +/// +public static partial class TracerProviderBuilderExtensions +{ + /// + /// Enables automatic data collection of outgoing requests to Kafka. + /// + /// The type of the key. + /// The type of the value. + /// being configured. + /// The instance of to chain the calls. + public static TracerProviderBuilder AddKafkaConsumerInstrumentation( + this TracerProviderBuilder builder) + => AddKafkaConsumerInstrumentation(builder, name: null, consumerBuilder: null); + + /// + /// Enables automatic data collection of outgoing requests to Kafka. + /// + /// The type of the key. + /// The type of the value. + /// being configured. + /// to instrument. + /// The instance of to chain the calls. + public static TracerProviderBuilder AddKafkaConsumerInstrumentation( + this TracerProviderBuilder builder, + InstrumentedConsumerBuilder consumerBuilder) + { + Guard.ThrowIfNull(consumerBuilder); + + return AddKafkaConsumerInstrumentation(builder, name: null, consumerBuilder); + } + + /// + /// Enables automatic data collection of outgoing requests to Kafka. + /// + /// The type of the key. + /// The type of the value. + /// being configured. + /// Optional name which is used when retrieving options. + /// Optional to instrument. + /// The instance of to chain the calls. + public static TracerProviderBuilder AddKafkaConsumerInstrumentation( + this TracerProviderBuilder builder, + string? name, + InstrumentedConsumerBuilder? consumerBuilder) + { + Guard.ThrowIfNull(builder); + + name ??= Options.DefaultName; + + builder.ConfigureServices(services => + { + services.Configure>(name, EnableTracing); + }); + + return builder + .AddSource(ConfluentKafkaCommon.InstrumentationName) + .AddInstrumentation(sp => + { + if (consumerBuilder == null) + { + consumerBuilder = sp.GetRequiredService>(); + var options = sp.GetRequiredService>>(); + consumerBuilder.Options = options.Get(name); + } + + if (consumerBuilder.Options == null) + { + consumerBuilder.Options = new ConfluentKafkaConsumerInstrumentationOptions(); + EnableTracing(consumerBuilder.Options); + } + + return new ConfluentKafkaConsumerInstrumentation(consumerBuilder); + }); + } + + private static void EnableTracing(ConfluentKafkaConsumerInstrumentationOptions options) => + options.Traces = true; +} diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/TracerProviderBuilderExtensions.Producer.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/TracerProviderBuilderExtensions.Producer.cs new file mode 100644 index 0000000000..db977c72ae --- /dev/null +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/TracerProviderBuilderExtensions.Producer.cs @@ -0,0 +1,90 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; +using OpenTelemetry.Instrumentation.ConfluentKafka; +using OpenTelemetry.Internal; + +namespace OpenTelemetry.Trace; + +/// +/// Extension methods to simplify registering of dependency instrumentation. +/// +public static partial class TracerProviderBuilderExtensions +{ + /// + /// Enables automatic data collection of outgoing requests to Kafka. + /// + /// The type of the key. + /// The type of the value. + /// being configured. + /// The instance of to chain the calls. + public static TracerProviderBuilder AddKafkaProducerInstrumentation( + this TracerProviderBuilder builder) + => AddKafkaProducerInstrumentation(builder, name: null, producerBuilder: null); + + /// + /// Enables automatic data collection of outgoing requests to Kafka. + /// + /// The type of the key. + /// The type of the value. + /// being configured. + /// to instrument. + /// The instance of to chain the calls. + public static TracerProviderBuilder AddKafkaProducerInstrumentation( + this TracerProviderBuilder builder, + InstrumentedProducerBuilder producerBuilder) + { + Guard.ThrowIfNull(producerBuilder); + + return AddKafkaProducerInstrumentation(builder, name: null, producerBuilder); + } + + /// + /// Enables automatic data collection of outgoing requests to Kafka. + /// + /// The type of the key. + /// The type of the value. + /// being configured. + /// Optional name which is used when retrieving options. + /// Optional to instrument. + /// The instance of to chain the calls. + public static TracerProviderBuilder AddKafkaProducerInstrumentation( + this TracerProviderBuilder builder, + string? name, + InstrumentedProducerBuilder? producerBuilder) + { + Guard.ThrowIfNull(builder); + + name ??= Options.DefaultName; + + builder.ConfigureServices(services => + { + services.Configure>(name, EnableTracing); + }); + + return builder + .AddSource(ConfluentKafkaCommon.InstrumentationName) + .AddInstrumentation(sp => + { + if (producerBuilder == null) + { + producerBuilder = sp.GetRequiredService>(); + var options = sp.GetRequiredService>>(); + producerBuilder.Options = options.Get(name); + } + + if (producerBuilder.Options == null) + { + producerBuilder.Options = new ConfluentKafkaProducerInstrumentationOptions(); + EnableTracing(producerBuilder.Options); + } + + return new ConfluentKafkaProducerInstrumentation(producerBuilder); + }); + } + + private static void EnableTracing(ConfluentKafkaProducerInstrumentationOptions options) => + options.Traces = true; +} diff --git a/src/Shared/SemanticConventions.cs b/src/Shared/SemanticConventions.cs index 2fa0c8144c..9f1c1ee234 100644 --- a/src/Shared/SemanticConventions.cs +++ b/src/Shared/SemanticConventions.cs @@ -116,5 +116,25 @@ internal static class SemanticConventions public const string AttributeServerAddress = "server.address"; // replaces: "net.host.name" (AttributeNetHostName) public const string AttributeServerPort = "server.port"; // replaces: "net.host.port" (AttributeNetHostPort) public const string AttributeUserAgentOriginal = "user_agent.original"; // replaces: http.user_agent (AttributeHttpUserAgent) + + // v1.24.0 Messaging spans + // https://github.com/open-telemetry/semantic-conventions/blob/v1.24.0/docs/messaging/messaging-spans.md + public const string AttributeMessagingClientId = "messaging.client_id"; + public const string AttributeMessagingDestinationName = "messaging.destination.name"; + + // v1.24.0 Messaging metrics + // https://github.com/open-telemetry/semantic-conventions/blob/v1.24.0/docs/messaging/messaging-metrics.md + public const string MetricMessagingPublishDuration = "messaging.publish.duration"; + public const string MetricMessagingPublishMessages = "messaging.publish.messages"; + public const string MetricMessagingReceiveDuration = "messaging.receive.duration"; + public const string MetricMessagingReceiveMessages = "messaging.receive.messages"; + + // v1.24.0 Messaging (Kafka) + // https://github.com/open-telemetry/semantic-conventions/blob/v1.24.0/docs/messaging/kafka.md + public const string AttributeMessagingKafkaConsumerGroup = "messaging.kafka.consumer.group"; + public const string AttributeMessagingKafkaDestinationPartition = "messaging.kafka.destination.partition"; + public const string AttributeMessagingKafkaMessageKey = "messaging.kafka.message.key"; + public const string AttributeMessagingKafkaMessageOffset = "messaging.kafka.message.offset"; + #pragma warning restore CS1591 // Missing XML comment for publicly visible type or member } diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/Dockerfile b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/Dockerfile new file mode 100644 index 0000000000..300c474662 --- /dev/null +++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/Dockerfile @@ -0,0 +1,19 @@ +# Create a container for running the OpenTelemetry ConfluentKafka integration tests. +# This should be run from the root of the repo: +# docker build --file test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/Dockerfile . + +ARG BUILD_SDK_VERSION=8.0 +ARG TEST_SDK_VERSION=8.0 + +FROM mcr.microsoft.com/dotnet/sdk:${BUILD_SDK_VERSION} AS build +ARG PUBLISH_CONFIGURATION=Release +ARG PUBLISH_FRAMEWORK=net8.0 +WORKDIR /repo +COPY . ./ +WORKDIR "/repo/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests" +RUN dotnet publish "OpenTelemetry.Instrumentation.ConfluentKafka.Tests.csproj" -c "${PUBLISH_CONFIGURATION}" -f "${PUBLISH_FRAMEWORK}" -o /drop -p:IntegrationBuild=true -p:TARGET_FRAMEWORK=${PUBLISH_FRAMEWORK} + +FROM mcr.microsoft.com/dotnet/sdk:${TEST_SDK_VERSION} AS final +WORKDIR /test +COPY --from=build /drop . +ENTRYPOINT ["dotnet", "vstest", "OpenTelemetry.Instrumentation.ConfluentKafka.Tests.dll"] diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedMeteringTests.cs b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedMeteringTests.cs new file mode 100644 index 0000000000..0755ec41de --- /dev/null +++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedMeteringTests.cs @@ -0,0 +1,113 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Confluent.Kafka; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using OpenTelemetry.Metrics; +using OpenTelemetry.Tests; +using Xunit; +using Xunit.Abstractions; + +namespace OpenTelemetry.Instrumentation.ConfluentKafka.Tests; + +[Collection("Kafka")] +public class HostedMeteringTests(ITestOutputHelper outputHelper) +{ + [Trait("CategoryName", "KafkaIntegrationTests")] + [SkipUnlessEnvVarFoundFact(KafkaHelpers.KafkaEndPointEnvVarName)] + public async Task ResolveInstrumentedBuildersFromHostServiceProviderTest() + { + List metrics = new(); + var builder = Host.CreateDefaultBuilder(); + builder.ConfigureServices(services => + { + services.AddSingleton(_ => + new InstrumentedProducerBuilder(new ProducerConfig() + { + BootstrapServers = KafkaHelpers.KafkaEndPoint, + })); + services.AddSingleton(_ => + new InstrumentedConsumerBuilder(new ConsumerConfig() + { + BootstrapServers = KafkaHelpers.KafkaEndPoint, + GroupId = Guid.NewGuid().ToString(), + AutoOffsetReset = AutoOffsetReset.Earliest, + EnablePartitionEof = true, + })); + + services.AddOpenTelemetry().WithMetrics(metricsBuilder => + { + metricsBuilder + .AddInMemoryExporter(metrics) + .AddKafkaProducerInstrumentation() + .AddKafkaConsumerInstrumentation(); + }); + }); + + IGrouping[] groups; + using (var host = builder.Build()) + { + await host.StartAsync(); + + string topic = $"otel-topic-{Guid.NewGuid()}"; + using (var producer = host.Services.GetRequiredService>().Build()) + { + for (int i = 0; i < 100; i++) + { + producer.Produce(topic, new Message() + { + Key = $"any_key_{i}", + Value = $"any_value_{i}", + }); + outputHelper.WriteLine("produced message {0}", i); + } + + await producer.FlushAsync(); + } + + host.Services.GetRequiredService().EnsureMetricsAreFlushed(); + + groups = metrics.GroupBy(x => x.Name).ToArray(); + + Assert.Equal(2, groups.Length); + + metrics.Clear(); + + using (var consumer = host.Services.GetRequiredService>().Build()) + { + consumer.Subscribe(topic); + + int j = 0; + while (true) + { + var consumerResult = consumer.Consume(); + if (consumerResult == null) + { + continue; + } + + if (consumerResult.IsPartitionEOF) + { + break; + } + + outputHelper.WriteLine("consumed message {0}", j); + j++; + } + } + + host.Services.GetRequiredService().EnsureMetricsAreFlushed(); + + await host.StopAsync(); + } + + groups = metrics.GroupBy(x => x.Name).ToArray(); + + Assert.Equal(2, groups.Length); + } +} diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedTracingAndMeteringTests.cs b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedTracingAndMeteringTests.cs new file mode 100644 index 0000000000..1d7724e873 --- /dev/null +++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedTracingAndMeteringTests.cs @@ -0,0 +1,127 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Threading.Tasks; +using Confluent.Kafka; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using OpenTelemetry.Metrics; +using OpenTelemetry.Tests; +using OpenTelemetry.Trace; +using Xunit; +using Xunit.Abstractions; + +namespace OpenTelemetry.Instrumentation.ConfluentKafka.Tests; + +[Collection("Kafka")] +public class HostedTracingAndMeteringTests(ITestOutputHelper outputHelper) +{ + [Trait("CategoryName", "KafkaIntegrationTests")] + [SkipUnlessEnvVarFoundFact(KafkaHelpers.KafkaEndPointEnvVarName)] + public async Task ResolveInstrumentedBuildersFromHostServiceProviderTest() + { + List metrics = new(); + List activities = new(); + var builder = Host.CreateDefaultBuilder(); + builder.ConfigureServices(services => + { + services.AddSingleton(_ => + new InstrumentedProducerBuilder(new ProducerConfig() + { + BootstrapServers = KafkaHelpers.KafkaEndPoint, + })); + services.AddSingleton(_ => + new InstrumentedConsumerBuilder(new ConsumerConfig() + { + BootstrapServers = KafkaHelpers.KafkaEndPoint, + GroupId = Guid.NewGuid().ToString(), + AutoOffsetReset = AutoOffsetReset.Earliest, + EnablePartitionEof = true, + })); + + services.AddOpenTelemetry().WithTracing(tracingBuilder => + { + tracingBuilder + .AddInMemoryExporter(activities) + .SetSampler(new TestSampler()) + .AddKafkaProducerInstrumentation() + .AddKafkaConsumerInstrumentation(); + }).WithMetrics(metricsBuilder => + { + metricsBuilder + .AddInMemoryExporter(metrics) + .AddKafkaProducerInstrumentation() + .AddKafkaConsumerInstrumentation(); + }); + }); + + IGrouping[] groups = null; + using (var host = builder.Build()) + { + await host.StartAsync(); + + string topic = $"otel-topic-{Guid.NewGuid()}"; + using (var producer = host.Services.GetRequiredService>().Build()) + { + for (int i = 0; i < 100; i++) + { + producer.Produce(topic, new Message() + { + Key = $"any_key_{i}", + Value = $"any_value_{i}", + }); + outputHelper.WriteLine("produced message {0}", i); + } + + await producer.FlushAsync(); + } + + host.Services.GetRequiredService().EnsureMetricsAreFlushed(); + + groups = metrics.GroupBy(x => x.Name).ToArray(); + + Assert.Equal(2, groups.Length); + + metrics.Clear(); + + using (var consumer = host.Services.GetRequiredService>().Build()) + { + consumer.Subscribe(topic); + + int j = 0; + while (true) + { + var consumerResult = consumer.Consume(); + if (consumerResult == null) + { + continue; + } + + if (consumerResult.IsPartitionEOF) + { + break; + } + + outputHelper.WriteLine("consumed message {0}", j); + j++; + } + + Assert.Equal(100, j); + } + + await host.StopAsync(); + + Assert.Equal(200, activities.Count); + + host.Services.GetRequiredService().EnsureMetricsAreFlushed(); + } + + groups = metrics.GroupBy(x => x.Name).ToArray(); + + Assert.Equal(2, groups.Length); + } +} diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedTracingTests.cs b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedTracingTests.cs new file mode 100644 index 0000000000..4bc03e4991 --- /dev/null +++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedTracingTests.cs @@ -0,0 +1,99 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Threading.Tasks; +using Confluent.Kafka; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using OpenTelemetry.Tests; +using OpenTelemetry.Trace; +using Xunit; +using Xunit.Abstractions; + +namespace OpenTelemetry.Instrumentation.ConfluentKafka.Tests; + +[Collection("Kafka")] +public class HostedTracingTests(ITestOutputHelper outputHelper) +{ + [Trait("CategoryName", "KafkaIntegrationTests")] + [SkipUnlessEnvVarFoundFact(KafkaHelpers.KafkaEndPointEnvVarName)] + public async Task ResolveInstrumentedBuildersFromHostServiceProviderTest() + { + List activities = new(); + var builder = Host.CreateDefaultBuilder(); + builder.ConfigureServices(services => + { + services.AddSingleton(_ => + new InstrumentedProducerBuilder(new ProducerConfig() + { + BootstrapServers = KafkaHelpers.KafkaEndPoint, + })); + services.AddSingleton(_ => + new InstrumentedConsumerBuilder(new ConsumerConfig() + { + BootstrapServers = KafkaHelpers.KafkaEndPoint, + GroupId = Guid.NewGuid().ToString(), + AutoOffsetReset = AutoOffsetReset.Earliest, + EnablePartitionEof = true, + })); + + services.AddOpenTelemetry().WithTracing(tracingBuilder => + { + tracingBuilder + .AddInMemoryExporter(activities) + .SetSampler(new TestSampler()) + .AddKafkaProducerInstrumentation() + .AddKafkaConsumerInstrumentation(); + }); + }); + + using (var host = builder.Build()) + { + await host.StartAsync(); + + string topic = $"otel-topic-{Guid.NewGuid()}"; + using (var producer = host.Services.GetRequiredService>().Build()) + { + for (int i = 0; i < 100; i++) + { + producer.Produce(topic, new Message() { Key = $"any_key_{i}", Value = $"any_value_{i}", }); + outputHelper.WriteLine("produced message {0}", i); + } + + await producer.FlushAsync(); + } + + using (var consumer = host.Services.GetRequiredService>().Build()) + { + consumer.Subscribe(topic); + + int j = 0; + while (true) + { + var consumerResult = consumer.Consume(); + if (consumerResult == null) + { + continue; + } + + if (consumerResult.IsPartitionEOF) + { + break; + } + + outputHelper.WriteLine("consumed message {0}", j); + j++; + } + + Assert.Equal(100, j); + } + + await host.StopAsync(); + } + + Assert.Equal(200, activities.Count); + } +} diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/KafkaHelpers.cs b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/KafkaHelpers.cs new file mode 100644 index 0000000000..2c375e0948 --- /dev/null +++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/KafkaHelpers.cs @@ -0,0 +1,32 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using System; +using System.Threading.Tasks; +using Confluent.Kafka; +using OpenTelemetry.Tests; + +namespace OpenTelemetry.Instrumentation.ConfluentKafka.Tests; + +internal static class KafkaHelpers +{ + public const string KafkaEndPointEnvVarName = "OTEL_KAFKAENDPOINT"; + + public static readonly string KafkaEndPoint = SkipUnlessEnvVarFoundTheoryAttribute.GetEnvironmentVariable(KafkaEndPointEnvVarName); + + public static async Task ProduceTestMessageAsync() + { + string topic = $"otel-topic-{Guid.NewGuid()}"; + ProducerConfig producerConfig = new ProducerConfig + { + BootstrapServers = KafkaEndPoint, + }; + ProducerBuilder producerBuilder = new(producerConfig); + IProducer producer = producerBuilder.Build(); + await producer.ProduceAsync(topic, new Message + { + Value = "any_value", + }); + return topic; + } +} diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/MeterProviderExtensions.cs b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/MeterProviderExtensions.cs new file mode 100644 index 0000000000..c26f38743a --- /dev/null +++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/MeterProviderExtensions.cs @@ -0,0 +1,19 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using OpenTelemetry.Metrics; + +namespace OpenTelemetry.Instrumentation.ConfluentKafka.Tests; + +internal static class MeterProviderExtensions +{ + public static void EnsureMetricsAreFlushed(this MeterProvider meterProvider) + { + bool done; + do + { + done = meterProvider.ForceFlush(); + } + while (!done); + } +} diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/MeteringTests.cs b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/MeteringTests.cs new file mode 100644 index 0000000000..f3843ab3c9 --- /dev/null +++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/MeteringTests.cs @@ -0,0 +1,139 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Confluent.Kafka; +using OpenTelemetry.Metrics; +using OpenTelemetry.Tests; +using Xunit; + +namespace OpenTelemetry.Instrumentation.ConfluentKafka.Tests; + +[Collection("Kafka")] +public class MeteringTests +{ + /* + To run the integration tests, set the OTEL_KAFKAENDPOINT machine-level environment variable to a valid Kafka endpoint. + + To use Docker... + 1) Run: docker run -d --name kafka -p 9092:9092 confluentinc/confluent-local + 2) Set OTEL_KAFKAENDPOINT as: localhost:9092 + */ + + [Trait("CategoryName", "KafkaIntegrationTests")] + [SkipUnlessEnvVarFoundFact(KafkaHelpers.KafkaEndPointEnvVarName)] + public async Task BasicProduceToTopicTest() + { + ProducerConfig producerConfig = new ProducerConfig + { + BootstrapServers = KafkaHelpers.KafkaEndPoint, + }; + InstrumentedProducerBuilder producerBuilder = new(producerConfig); + var metrics = new List(); + string topic = $"otel-topic-{Guid.NewGuid()}"; + using (var meterProvider = Sdk.CreateMeterProviderBuilder() + .AddInMemoryExporter(metrics) + .AddKafkaProducerInstrumentation(producerBuilder) + .Build()) + { + IProducer producer = producerBuilder.Build(); + producer.Produce(topic, new Message + { + Value = "any_value", + }); + + await producer.FlushAsync(); + + meterProvider.EnsureMetricsAreFlushed(); + } + + var groups = metrics.GroupBy(m => m.Name).ToArray(); + + Assert.Equal(2, groups.Length); + } + + [Trait("CategoryName", "KafkaIntegrationTests")] + [SkipUnlessEnvVarFoundFact(KafkaHelpers.KafkaEndPointEnvVarName)] + public async Task BasicProduceAsyncToTopicTest() + { + ProducerConfig producerConfig = new ProducerConfig + { + BootstrapServers = KafkaHelpers.KafkaEndPoint, + }; + InstrumentedProducerBuilder producerBuilder = new(producerConfig); + var metrics = new List(); + string topic = $"otel-topic-{Guid.NewGuid()}"; + using (var meterProvider = Sdk.CreateMeterProviderBuilder() + .AddInMemoryExporter(metrics) + .AddKafkaProducerInstrumentation(producerBuilder) + .Build()) + { + IProducer producer = producerBuilder.Build(); + await producer.ProduceAsync(topic, new Message + { + Value = "any_value", + }); + + await producer.FlushAsync(); + + meterProvider.EnsureMetricsAreFlushed(); + } + + var groups = metrics.GroupBy(m => m.Name).ToArray(); + + Assert.Equal(2, groups.Length); + } + + [Trait("CategoryName", "KafkaIntegrationTests")] + [SkipUnlessEnvVarFoundFact(KafkaHelpers.KafkaEndPointEnvVarName)] + public async Task BasicConsumeWithTimeoutTimespanTest() + { + string topic = await KafkaHelpers.ProduceTestMessageAsync(); + + ConsumerConfig consumerConfig = new ConsumerConfig + { + BootstrapServers = KafkaHelpers.KafkaEndPoint, + GroupId = "test-consumer-group", + AutoOffsetReset = AutoOffsetReset.Earliest, + EnablePartitionEof = true, + }; + InstrumentedConsumerBuilder consumerBuilder = new(consumerConfig); + + var metrics = new List(); + using (var meterProvider = Sdk.CreateMeterProviderBuilder() + .AddInMemoryExporter(metrics) + .AddKafkaConsumerInstrumentation(consumerBuilder) + .Build()) + { + using (IConsumer consumer = consumerBuilder.Build()) + { + consumer.Subscribe(topic); + while (true) + { + var consumeResult = consumer.Consume(); + + if (consumeResult == null) + { + continue; + } + + if (consumeResult.IsPartitionEOF) + { + break; + } + } + + consumer.Close(); + } + + meterProvider.EnsureMetricsAreFlushed(); + } + + var groups = metrics.GroupBy(m => m.Name).ToArray(); + + Assert.Equal(2, groups.Length); + } +} diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/OpenTelemetry.Instrumentation.ConfluentKafka.Tests.csproj b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/OpenTelemetry.Instrumentation.ConfluentKafka.Tests.csproj new file mode 100644 index 0000000000..defd085a4a --- /dev/null +++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/OpenTelemetry.Instrumentation.ConfluentKafka.Tests.csproj @@ -0,0 +1,28 @@ + + + Unit test project for OpenTelemetry ConfluentKafka instrumentation + + $(SupportedNetTargets) + $(TargetFrameworks);$(NetFrameworkMinimumSupportedVersion) + disable + + + + + + + + + + + + + + + + + + + + + diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/ProducerExtensions.cs b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/ProducerExtensions.cs new file mode 100644 index 0000000000..9066260da0 --- /dev/null +++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/ProducerExtensions.cs @@ -0,0 +1,19 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using System; +using System.Threading.Tasks; +using Confluent.Kafka; + +namespace OpenTelemetry.Instrumentation.ConfluentKafka.Tests; + +internal static class ProducerExtensions +{ + public static async Task FlushAsync(this IProducer producer) + { + while (producer.Flush(TimeSpan.FromMilliseconds(100)) != 0) + { + await Task.Delay(100); + } + } +} diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/TracingTests.cs b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/TracingTests.cs new file mode 100644 index 0000000000..7d322862d7 --- /dev/null +++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/TracingTests.cs @@ -0,0 +1,308 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Threading.Tasks; +using Confluent.Kafka; +using OpenTelemetry.Tests; +using OpenTelemetry.Trace; +using Xunit; + +namespace OpenTelemetry.Instrumentation.ConfluentKafka.Tests; + +[Collection("Kafka")] +public class TracingTests +{ + /* + To run the integration tests, set the OTEL_KAFKAENDPOINT machine-level environment variable to a valid Kafka endpoint. + + To use Docker... + 1) Run: docker run -d --name kafka -p 9092:9092 confluentinc/confluent-local + 2) Set OTEL_KAFKAENDPOINT as: localhost:9092 + */ + + [Trait("CategoryName", "KafkaIntegrationTests")] + [SkipUnlessEnvVarFoundFact(KafkaHelpers.KafkaEndPointEnvVarName)] + public async Task BasicProduceAsyncToTopicTest() + { + ProducerConfig producerConfig = new ProducerConfig + { + BootstrapServers = KafkaHelpers.KafkaEndPoint, + }; + InstrumentedProducerBuilder producerBuilder = new(producerConfig); + var sampler = new TestSampler(); + var activities = new List(); + string topic = $"otel-topic-{Guid.NewGuid()}"; + using (Sdk.CreateTracerProviderBuilder() + .AddInMemoryExporter(activities) + .SetSampler(sampler) + .AddKafkaProducerInstrumentation(producerBuilder) + .Build()) + { + using IProducer producer = producerBuilder.Build(); + await producer.ProduceAsync(topic, new Message + { + Value = "any_value", + }); + } + + Assert.Contains(activities, activity => activity.DisplayName == topic + " publish"); + var activity = Assert.Single(activities); + Assert.Equal("kafka", activity.GetTagValue(SemanticConventions.AttributeMessagingSystem)); + Assert.Equal("publish", activity.GetTagValue(SemanticConventions.AttributeMessagingOperation)); + Assert.Equal(topic, activity.GetTagValue("messaging.destination.name")); + } + + [Trait("CategoryName", "KafkaIntegrationTests")] + [SkipUnlessEnvVarFoundFact(KafkaHelpers.KafkaEndPointEnvVarName)] + public async Task BasicProduceAsyncToTopicPartitionTest() + { + ProducerConfig producerConfig = new ProducerConfig + { + BootstrapServers = KafkaHelpers.KafkaEndPoint, + }; + InstrumentedProducerBuilder producerBuilder = new(producerConfig); + var sampler = new TestSampler(); + var activities = new List(); + string topic = $"otel-topic-{Guid.NewGuid()}"; + using (Sdk.CreateTracerProviderBuilder() + .AddInMemoryExporter(activities) + .SetSampler(sampler) + .AddKafkaProducerInstrumentation(producerBuilder) + .Build()) + { + using IProducer producer = producerBuilder.Build(); + await producer.ProduceAsync(new TopicPartition(topic, new Partition(0)), new Message + { + Value = "any_value", + }); + } + + Assert.Contains(activities, activity => activity.DisplayName == topic + " publish"); + var activity = Assert.Single(activities); + Assert.Equal("kafka", activity.GetTagValue(SemanticConventions.AttributeMessagingSystem)); + Assert.Equal("publish", activity.GetTagValue(SemanticConventions.AttributeMessagingOperation)); + Assert.Equal(topic, activity.GetTagValue("messaging.destination.name")); + Assert.Equal(0, activity.GetTagValue("messaging.kafka.destination.partition")); + } + + [Trait("CategoryName", "KafkaIntegrationTests")] + [SkipUnlessEnvVarFoundFact(KafkaHelpers.KafkaEndPointEnvVarName)] + public void BasicProduceSyncToTopicTest() + { + ProducerConfig producerConfig = new ProducerConfig + { + BootstrapServers = KafkaHelpers.KafkaEndPoint, + }; + InstrumentedProducerBuilder producerBuilder = new(producerConfig); + var sampler = new TestSampler(); + var activities = new List(); + string topic = $"otel-topic-{Guid.NewGuid()}"; + using (Sdk.CreateTracerProviderBuilder() + .AddInMemoryExporter(activities) + .SetSampler(sampler) + .AddKafkaProducerInstrumentation(producerBuilder) + .Build()) + { + using IProducer producer = producerBuilder.Build(); + producer.Produce(topic, new Message + { + Value = "any_value", + }); + } + + Assert.Contains(activities, activity => activity.DisplayName == topic + " publish"); + var activity = Assert.Single(activities); + Assert.Equal("kafka", activity.GetTagValue(SemanticConventions.AttributeMessagingSystem)); + Assert.Equal("publish", activity.GetTagValue(SemanticConventions.AttributeMessagingOperation)); + Assert.Equal(topic, activity.GetTagValue("messaging.destination.name")); + } + + [Trait("CategoryName", "KafkaIntegrationTests")] + [SkipUnlessEnvVarFoundFact(KafkaHelpers.KafkaEndPointEnvVarName)] + public void BasicProduceSyncToTopicPartitionTest() + { + ProducerConfig producerConfig = new ProducerConfig + { + BootstrapServers = KafkaHelpers.KafkaEndPoint, + }; + InstrumentedProducerBuilder producerBuilder = new(producerConfig); + var sampler = new TestSampler(); + var activities = new List(); + string topic = $"otel-topic-{Guid.NewGuid()}"; + using (Sdk.CreateTracerProviderBuilder() + .AddInMemoryExporter(activities) + .SetSampler(sampler) + .AddKafkaProducerInstrumentation(producerBuilder) + .Build()) + { + using IProducer producer = producerBuilder.Build(); + producer.Produce(new TopicPartition(topic, new Partition(0)), new Message + { + Value = "any_value", + }); + } + + Assert.Contains(activities, activity => activity.DisplayName == topic + " publish"); + var activity = Assert.Single(activities); + Assert.Equal("kafka", activity.GetTagValue(SemanticConventions.AttributeMessagingSystem)); + Assert.Equal("publish", activity.GetTagValue(SemanticConventions.AttributeMessagingOperation)); + Assert.Equal(topic, activity.GetTagValue("messaging.destination.name")); + Assert.Equal(0, activity.GetTagValue("messaging.kafka.destination.partition")); + } + + [Trait("CategoryName", "KafkaIntegrationTests")] + [SkipUnlessEnvVarFoundFact(KafkaHelpers.KafkaEndPointEnvVarName)] + public async Task BasicConsumeWithCancellationTokenTest() + { + string topic = await KafkaHelpers.ProduceTestMessageAsync(); + + ConsumerConfig consumerConfig = new ConsumerConfig + { + BootstrapServers = KafkaHelpers.KafkaEndPoint, + GroupId = "test-consumer-group", + AutoOffsetReset = AutoOffsetReset.Earliest, + EnablePartitionEof = true, + }; + InstrumentedConsumerBuilder consumerBuilder = new(consumerConfig); + var sampler = new TestSampler(); + var activities = new List(); + using (Sdk.CreateTracerProviderBuilder() + .AddInMemoryExporter(activities) + .SetSampler(sampler) + .AddKafkaConsumerInstrumentation(consumerBuilder) + .Build()) + { + using IConsumer consumer = consumerBuilder.Build(); + consumer.Subscribe(topic); + while (true) + { + var consumeResult = consumer.Consume(); + if (consumeResult == null) + { + continue; + } + + if (consumeResult.IsPartitionEOF) + { + break; + } + } + + consumer.Close(); + } + + Assert.Contains(activities, activity => activity.DisplayName == topic + " receive"); + var activity = Assert.Single(activities); + Assert.Equal("kafka", activity.GetTagValue(SemanticConventions.AttributeMessagingSystem)); + Assert.Equal("receive", activity.GetTagValue(SemanticConventions.AttributeMessagingOperation)); + Assert.Equal(topic, activity.GetTagValue("messaging.destination.name")); + Assert.Equal(0, activity.GetTagValue("messaging.kafka.destination.partition")); + Assert.Equal(0L, activity.GetTagValue("messaging.kafka.message.offset")); + Assert.Equal("test-consumer-group", activity.GetTagValue("messaging.kafka.consumer.group")); + } + + [Trait("CategoryName", "KafkaIntegrationTests")] + [SkipUnlessEnvVarFoundFact(KafkaHelpers.KafkaEndPointEnvVarName)] + public async Task BasicConsumeWithTimeoutMsTest() + { + string topic = await KafkaHelpers.ProduceTestMessageAsync(); + + ConsumerConfig consumerConfig = new ConsumerConfig + { + BootstrapServers = KafkaHelpers.KafkaEndPoint, + GroupId = "test-consumer-group", + AutoOffsetReset = AutoOffsetReset.Earliest, + EnablePartitionEof = true, + }; + InstrumentedConsumerBuilder consumerBuilder = new(consumerConfig); + var sampler = new TestSampler(); + var activities = new List(); + using (Sdk.CreateTracerProviderBuilder() + .AddInMemoryExporter(activities) + .SetSampler(sampler) + .AddKafkaConsumerInstrumentation(consumerBuilder) + .Build()) + { + using IConsumer consumer = consumerBuilder.Build(); + consumer.Subscribe(topic); + while (true) + { + var consumeResult = consumer.Consume(100); + if (consumeResult == null) + { + continue; + } + + if (consumeResult.IsPartitionEOF) + { + break; + } + } + + consumer.Close(); + } + + Assert.Contains(activities, activity => activity.DisplayName == topic + " receive"); + var activity = Assert.Single(activities); + Assert.Equal("kafka", activity.GetTagValue(SemanticConventions.AttributeMessagingSystem)); + Assert.Equal("receive", activity.GetTagValue(SemanticConventions.AttributeMessagingOperation)); + Assert.Equal(topic, activity.GetTagValue("messaging.destination.name")); + Assert.Equal(0, activity.GetTagValue("messaging.kafka.destination.partition")); + Assert.Equal(0L, activity.GetTagValue("messaging.kafka.message.offset")); + Assert.Equal("test-consumer-group", activity.GetTagValue("messaging.kafka.consumer.group")); + } + + [Trait("CategoryName", "KafkaIntegrationTests")] + [SkipUnlessEnvVarFoundFact(KafkaHelpers.KafkaEndPointEnvVarName)] + public async Task BasicConsumeWithTimeoutTimespanTest() + { + string topic = await KafkaHelpers.ProduceTestMessageAsync(); + + ConsumerConfig consumerConfig = new ConsumerConfig + { + BootstrapServers = KafkaHelpers.KafkaEndPoint, + GroupId = "test-consumer-group", + AutoOffsetReset = AutoOffsetReset.Earliest, + EnablePartitionEof = true, + }; + InstrumentedConsumerBuilder consumerBuilder = new(consumerConfig); + var sampler = new TestSampler(); + var activities = new List(); + using (Sdk.CreateTracerProviderBuilder() + .AddInMemoryExporter(activities) + .SetSampler(sampler) + .AddKafkaConsumerInstrumentation(consumerBuilder) + .Build()) + { + using IConsumer consumer = consumerBuilder.Build(); + consumer.Subscribe(topic); + while (true) + { + var consumeResult = consumer.Consume(TimeSpan.FromMilliseconds(100)); + if (consumeResult == null) + { + continue; + } + + if (consumeResult.IsPartitionEOF) + { + break; + } + } + + consumer.Close(); + } + + Assert.Contains(activities, activity => activity.DisplayName == topic + " receive"); + var activity = Assert.Single(activities); + Assert.Equal("kafka", activity.GetTagValue(SemanticConventions.AttributeMessagingSystem)); + Assert.Equal("receive", activity.GetTagValue(SemanticConventions.AttributeMessagingOperation)); + Assert.Equal(topic, activity.GetTagValue("messaging.destination.name")); + Assert.Equal(0, activity.GetTagValue("messaging.kafka.destination.partition")); + Assert.Equal(0L, activity.GetTagValue("messaging.kafka.message.offset")); + Assert.Equal("test-consumer-group", activity.GetTagValue("messaging.kafka.consumer.group")); + } +} diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/docker-compose.yml b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/docker-compose.yml new file mode 100644 index 0000000000..4761a9f0bb --- /dev/null +++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/docker-compose.yml @@ -0,0 +1,24 @@ +# Start a kafka container and then run OpenTelemetry ConfluentKafka integration tests. +# This should be run from the root of the repo: +# opentelemetry>docker-compose --file=test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/docker-compose.yml --project-directory=. up --exit-code-from=tests --build +version: '3.7' + +services: + kafka: + image: confluentinc/confluent-local + environment: + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT + KAFKA_LISTENERS: PLAINTEXT://localhost:29092,CONTROLLER://localhost:29093,PLAINTEXT_HOST://0.0.0.0:9092,PLAINTEXT_INTERNAL://kafka:9092 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092,PLAINTEXT_INTERNAL://kafka:9092,PLAINTEXT_HOST://localhost:9092 + ports: + - "9092:9092" + + tests: + build: + context: . + dockerfile: ./test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/Dockerfile + command: --TestCaseFilter:CategoryName=KafkaIntegrationTests + environment: + - OTEL_KAFKAENDPOINT=kafka:9092 + depends_on: + - kafka From 9d0122cd9a5589e2c28986d3ad6d47456201865f Mon Sep 17 00:00:00 2001 From: g7ed6e Date: Wed, 19 Jun 2024 18:08:38 +0200 Subject: [PATCH 2/9] refactor --- .../InstrumentedConsumer.cs | 140 +++++++++--------- .../InstrumentedProducer.cs | 59 +++----- 2 files changed, 96 insertions(+), 103 deletions(-) diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedConsumer.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedConsumer.cs index d279d71c48..387075e9b4 100644 --- a/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedConsumer.cs +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedConsumer.cs @@ -56,63 +56,82 @@ public void SetSaslCredentials(string username, string password) public ConsumeResult? Consume(int millisecondsTimeout) { DateTimeOffset start = DateTimeOffset.UtcNow; + ConsumeResult? result = null; + ConsumeResult consumeResult = default; + string? errorType = null; try { - var result = this.consumer.Consume(millisecondsTimeout); - if (result is { IsPartitionEOF: false }) - { - this.InstrumentConsumption(start, result); - } - + result = this.consumer.Consume(millisecondsTimeout); + consumeResult = ExtractConsumeResult(result); return result; } catch (ConsumeException e) { - this.InstrumentConsumptionError(start, e); + (consumeResult, errorType) = ExtractConsumeResult(e); throw; } + finally + { + DateTimeOffset end = DateTimeOffset.UtcNow; + if (result is { IsPartitionEOF: false }) + { + this.InstrumentConsumption(start, end, consumeResult, errorType); + } + } } public ConsumeResult? Consume(CancellationToken cancellationToken = default) { DateTimeOffset start = DateTimeOffset.UtcNow; - + ConsumeResult? result = null; + ConsumeResult consumeResult = default; + string? errorType = null; try { - var result = this.consumer.Consume(cancellationToken); - if (result is { IsPartitionEOF: false }) - { - this.InstrumentConsumption(start, result); - } - + result = this.consumer.Consume(cancellationToken); + consumeResult = ExtractConsumeResult(result); return result; } catch (ConsumeException e) { - this.InstrumentConsumptionError(start, e); + (consumeResult, errorType) = ExtractConsumeResult(e); throw; } + finally + { + DateTimeOffset end = DateTimeOffset.UtcNow; + if (result is { IsPartitionEOF: false }) + { + this.InstrumentConsumption(start, end, consumeResult, errorType); + } + } } public ConsumeResult? Consume(TimeSpan timeout) { DateTimeOffset start = DateTimeOffset.UtcNow; - + ConsumeResult? result = null; + ConsumeResult consumeResult = default; + string? errorType = null; try { - var result = this.consumer.Consume(timeout); - if (result is { IsPartitionEOF: false }) - { - this.InstrumentConsumption(start, result); - } - + result = this.consumer.Consume(timeout); + consumeResult = ExtractConsumeResult(result); return result; } catch (ConsumeException e) { - this.InstrumentConsumptionError(start, e); + (consumeResult, errorType) = ExtractConsumeResult(e); throw; } + finally + { + DateTimeOffset end = DateTimeOffset.UtcNow; + if (result is { IsPartitionEOF: false }) + { + this.InstrumentConsumption(start, end, consumeResult, errorType); + } + } } public void Subscribe(IEnumerable topics) @@ -259,6 +278,20 @@ private static IEnumerable ExtractTraceContext(Headers? headers, string } } + private static ConsumeResult ExtractConsumeResult(ConsumeResult result) => result switch + { + null => new ConsumeResult(null, null), + { Message: null } => new ConsumeResult(result.TopicPartitionOffset, null), + _ => new ConsumeResult(result.TopicPartitionOffset, result.Message.Headers, result.Message.Key), + }; + + private static (ConsumeResult ConsumeResult, string ErrorType) ExtractConsumeResult(ConsumeException exception) => exception switch + { + { ConsumerRecord: null } => (new ConsumeResult(null, null), FormatConsumeException(exception)), + { ConsumerRecord.Message: null } => (new ConsumeResult(exception.ConsumerRecord.TopicPartitionOffset, null), FormatConsumeException(exception)), + _ => (new ConsumeResult(exception.ConsumerRecord.TopicPartitionOffset, exception.ConsumerRecord.Message.Headers, exception.ConsumerRecord.Message.Key), FormatConsumeException(exception)), + }; + private static IEnumerable> GetTags(string topic, int? partition = null, string? errorType = null) { yield return new KeyValuePair( @@ -292,72 +325,43 @@ private void RecordReceive(TopicPartition topicPartition, TimeSpan duration, str this.consumerMeterInstrumentation.RecordReceiveDuration(duration.TotalSeconds, tags); } - private void InstrumentConsumption(DateTimeOffset startTime, ConsumeResult result) + private void InstrumentConsumption(DateTimeOffset startTime, DateTimeOffset endTime, ConsumeResult consumeResult, string? errorType) { - Activity? activity = null; if (this.options.Traces) { - ConsumeResult consumeResult = new(result.TopicPartitionOffset, result.Message.Headers, result.Message.Key); PropagationContext propagationContext = consumeResult.Headers != null ? ExtractPropagationContext(consumeResult.Headers) : default; - var spanName = string.IsNullOrEmpty(consumeResult.TopicPartitionOffset?.Topic) - ? ReceiveOperationName - : string.Concat(consumeResult.TopicPartitionOffset!.Topic, " ", ReceiveOperationName); - - activity = this.StartReceiveActivity(spanName, propagationContext, startTime, consumeResult.TopicPartitionOffset, consumeResult.Key); - activity?.Stop(); - } - if (this.options.Metrics) - { - TimeSpan duration = activity?.Duration ?? DateTimeOffset.UtcNow - startTime; - this.RecordReceive(result.TopicPartition, duration); - } - - activity?.Dispose(); - } - - private void InstrumentConsumptionError(DateTimeOffset startTime, ConsumeException exception) - { - Activity? activity = null; - string? errorType = null; - if (this.options.Traces) - { - ConsumeResult consumeResult = new(exception.ConsumerRecord.TopicPartitionOffset, exception.ConsumerRecord.Message.Headers, exception.ConsumerRecord.Message.Key); - PropagationContext propagationContext = consumeResult.Headers != null - ? ExtractPropagationContext(consumeResult.Headers) - : default; - var spanName = string.IsNullOrEmpty(consumeResult.TopicPartitionOffset?.Topic) - ? ReceiveOperationName - : string.Concat(consumeResult.TopicPartitionOffset!.Topic, " ", ReceiveOperationName); - - activity = this.StartReceiveActivity(spanName, propagationContext, startTime, consumeResult.TopicPartitionOffset, consumeResult.Key); - errorType = FormatConsumeException(exception); + using Activity? activity = this.StartReceiveActivity(propagationContext, startTime, consumeResult.TopicPartitionOffset, consumeResult.Key); if (activity != null) { - activity.SetStatus(ActivityStatusCode.Error); - if (activity.IsAllDataRequested) + if (errorType != null) { - activity.SetTag(SemanticConventions.AttributeErrorType, errorType); + activity.SetStatus(ActivityStatusCode.Error); + if (activity.IsAllDataRequested) + { + activity.SetTag(SemanticConventions.AttributeErrorType, errorType); + } } - activity.Stop(); + activity.SetEndTime(endTime.UtcDateTime); } } if (this.options.Metrics) { - TimeSpan duration = activity?.Duration ?? DateTimeOffset.UtcNow - startTime; - errorType ??= FormatConsumeException(exception); - this.RecordReceive(exception.ConsumerRecord.TopicPartition, duration, errorType); + TimeSpan duration = endTime - startTime; + this.RecordReceive(consumeResult.TopicPartitionOffset!.TopicPartition, duration, errorType); } - - activity?.Dispose(); } - private Activity? StartReceiveActivity(string spanName, PropagationContext propagationContext, DateTimeOffset start, TopicPartitionOffset? topicPartitionOffset, object? key) + private Activity? StartReceiveActivity(PropagationContext propagationContext, DateTimeOffset start, TopicPartitionOffset? topicPartitionOffset, object? key) { + var spanName = string.IsNullOrEmpty(topicPartitionOffset?.Topic) + ? ReceiveOperationName + : string.Concat(topicPartitionOffset!.Topic, " ", ReceiveOperationName); + ActivityLink[] activityLinks = propagationContext.ActivityContext.IsValid() ? new[] { new ActivityLink(propagationContext.ActivityContext) } : Array.Empty(); diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedProducer.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedProducer.cs index 31d238df61..3df5553eb2 100644 --- a/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedProducer.cs +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedProducer.cs @@ -49,7 +49,7 @@ public async Task> ProduceAsync( CancellationToken cancellationToken = default) { DateTimeOffset start = DateTimeOffset.UtcNow; - using Activity? activity = this.StartActivity(PublishOperationName, topic, message); + using Activity? activity = this.StartPublishActivity(start, topic, message); if (activity != null) { this.InjectActivity(activity, message); @@ -77,8 +77,9 @@ public async Task> ProduceAsync( } finally { - TimeSpan duration = activity?.Duration ?? DateTimeOffset.UtcNow - start; - activity?.Stop(); + DateTimeOffset end = DateTimeOffset.UtcNow; + activity?.SetEndTime(end.UtcDateTime); + TimeSpan duration = end - start; if (this.options.Metrics) { @@ -95,7 +96,7 @@ public async Task> ProduceAsync( CancellationToken cancellationToken = default) { DateTimeOffset start = DateTimeOffset.UtcNow; - using Activity? activity = this.StartActivity(topicPartition, message); + using Activity? activity = this.StartPublishActivity(start, topicPartition.Topic, message, topicPartition.Partition); if (activity != null) { this.InjectActivity(activity, message); @@ -123,8 +124,9 @@ public async Task> ProduceAsync( } finally { - activity?.Stop(); - TimeSpan duration = activity?.Duration ?? DateTimeOffset.UtcNow - start; + DateTimeOffset end = DateTimeOffset.UtcNow; + activity?.SetEndTime(end.UtcDateTime); + TimeSpan duration = end - start; if (this.options.Metrics) { @@ -138,7 +140,7 @@ public async Task> ProduceAsync( public void Produce(string topic, Message message, Action>? deliveryHandler = null) { DateTimeOffset start = DateTimeOffset.UtcNow; - using Activity? activity = this.StartActivity(PublishOperationName, topic, message); + using Activity? activity = this.StartPublishActivity(start, topic, message); if (activity != null) { this.InjectActivity(activity, message); @@ -165,8 +167,9 @@ public void Produce(string topic, Message message, Action message, Action message, Action>? deliveryHandler = null) { DateTimeOffset start = DateTimeOffset.UtcNow; - using Activity? activity = this.StartActivity(topicPartition, message); + using Activity? activity = this.StartPublishActivity(start, topicPartition.Topic, message, topicPartition.Partition); if (activity != null) { this.InjectActivity(activity, message); @@ -205,8 +208,9 @@ public void Produce(TopicPartition topicPartition, Message message } finally { - activity?.Stop(); - TimeSpan duration = activity?.Duration ?? DateTimeOffset.UtcNow - start; + DateTimeOffset end = DateTimeOffset.UtcNow; + activity?.SetEndTime(end.UtcDateTime); + TimeSpan duration = end - start; if (this.options.Metrics) { @@ -317,9 +321,10 @@ private void RecordPublish(TopicPartition topicPartition, TimeSpan duration, str this.producerMeterInstrumentation.RecordPublishDuration(duration.TotalSeconds, tags); } - private Activity? StartActivity(string operation, string topic, Message message) + private Activity? StartPublishActivity(DateTimeOffset start, string topic, Message message, int? partition = null) { - var activity = ConfluentKafkaCommon.ActivitySource.StartActivity(string.Concat(topic, " ", operation), ActivityKind.Producer); + var spanName = string.Concat(topic, " ", PublishOperationName); + var activity = ConfluentKafkaCommon.ActivitySource.StartActivity(name: spanName, kind: ActivityKind.Producer, startTime: start); if (activity == null) { return null; @@ -330,33 +335,17 @@ private void RecordPublish(TopicPartition topicPartition, TimeSpan duration, str activity.SetTag(SemanticConventions.AttributeMessagingSystem, KafkaMessagingSystem); activity.SetTag(SemanticConventions.AttributeMessagingClientId, this.Name); activity.SetTag(SemanticConventions.AttributeMessagingDestinationName, topic); - activity.SetTag(SemanticConventions.AttributeMessagingOperation, operation); + activity.SetTag(SemanticConventions.AttributeMessagingOperation, PublishOperationName); if (message.Key != null) { activity.SetTag(SemanticConventions.AttributeMessagingKafkaMessageKey, message.Key); } - } - - return activity; - } - - private Activity? StartActivity(TopicPartition topicPartition, Message message) - { - if (!this.options.Traces) - { - return null; - } - var activity = this.StartActivity(PublishOperationName, topicPartition.Topic, message); - if (activity == null) - { - return null; - } - - if (activity.IsAllDataRequested) - { - activity.SetTag(SemanticConventions.AttributeMessagingKafkaDestinationPartition, topicPartition.Partition.Value); + if (partition is not null) + { + activity.SetTag(SemanticConventions.AttributeMessagingKafkaDestinationPartition, partition); + } } return activity; From f3c4df8b5e88dcee7899297011b71478f98c9148 Mon Sep 17 00:00:00 2001 From: Mikel Blanchard Date: Tue, 25 Jun 2024 10:24:17 -0700 Subject: [PATCH 3/9] Add MinVerTagPrefix in kafka proj. --- .../OpenTelemetry.Instrumentation.ConfluentKafka.proj | 1 + opentelemetry-dotnet-contrib.sln | 10 +++++++--- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/build/Projects/OpenTelemetry.Instrumentation.ConfluentKafka.proj b/build/Projects/OpenTelemetry.Instrumentation.ConfluentKafka.proj index d0295e54da..0d0a145639 100644 --- a/build/Projects/OpenTelemetry.Instrumentation.ConfluentKafka.proj +++ b/build/Projects/OpenTelemetry.Instrumentation.ConfluentKafka.proj @@ -2,6 +2,7 @@ $([System.IO.Directory]::GetParent($(MSBuildThisFileDirectory)).Parent.Parent.FullName) + Instrumentation.ConfluentKafka- diff --git a/opentelemetry-dotnet-contrib.sln b/opentelemetry-dotnet-contrib.sln index 65a1740bef..f7508674db 100644 --- a/opentelemetry-dotnet-contrib.sln +++ b/opentelemetry-dotnet-contrib.sln @@ -287,9 +287,13 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Projects", "Projects", "{04 ProjectSection(SolutionItems) = preProject build\Projects\Component.proj = build\Projects\Component.proj build\Projects\OpenTelemetry.Contrib.Shared.Tests.proj = build\Projects\OpenTelemetry.Contrib.Shared.Tests.proj + build\Projects\OpenTelemetry.Exporter.InfluxDB.proj = build\Projects\OpenTelemetry.Exporter.InfluxDB.proj + build\Projects\OpenTelemetry.Extensions.Enrichment.proj = build\Projects\OpenTelemetry.Extensions.Enrichment.proj build\Projects\OpenTelemetry.Instrumentation.AspNet.proj = build\Projects\OpenTelemetry.Instrumentation.AspNet.proj build\Projects\OpenTelemetry.Instrumentation.AspNetCore.proj = build\Projects\OpenTelemetry.Instrumentation.AspNetCore.proj + build\Projects\OpenTelemetry.Instrumentation.ConfluentKafka.proj = build\Projects\OpenTelemetry.Instrumentation.ConfluentKafka.proj build\Projects\OpenTelemetry.Instrumentation.EventCounters.proj = build\Projects\OpenTelemetry.Instrumentation.EventCounters.proj + build\Projects\OpenTelemetry.Instrumentation.GrpcCore.proj = build\Projects\OpenTelemetry.Instrumentation.GrpcCore.proj build\Projects\OpenTelemetry.Instrumentation.Owin.proj = build\Projects\OpenTelemetry.Instrumentation.Owin.proj build\Projects\OpenTelemetry.Instrumentation.Process.proj = build\Projects\OpenTelemetry.Instrumentation.Process.proj build\Projects\OpenTelemetry.Instrumentation.Runtime.proj = build\Projects\OpenTelemetry.Instrumentation.Runtime.proj @@ -377,13 +381,13 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "ISSUE_TEMPLATE", "ISSUE_TEM .github\ISSUE_TEMPLATE\feature_request.yml = .github\ISSUE_TEMPLATE\feature_request.yml EndProjectSection EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "OpenTelemetry.Instrumentation.ConfluentKafka", "src\OpenTelemetry.Instrumentation.ConfluentKafka\OpenTelemetry.Instrumentation.ConfluentKafka.csproj", "{96341E23-990E-4144-A7E3-9EF0DAFF3232}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenTelemetry.Instrumentation.ConfluentKafka", "src\OpenTelemetry.Instrumentation.ConfluentKafka\OpenTelemetry.Instrumentation.ConfluentKafka.csproj", "{96341E23-990E-4144-A7E3-9EF0DAFF3232}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "OpenTelemetry.Instrumentation.ConfluentKafka.Tests", "test\OpenTelemetry.Instrumentation.ConfluentKafka.Tests\OpenTelemetry.Instrumentation.ConfluentKafka.Tests.csproj", "{BE40900A-2859-471D-8802-21DFD73DDAA7}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenTelemetry.Instrumentation.ConfluentKafka.Tests", "test\OpenTelemetry.Instrumentation.ConfluentKafka.Tests\OpenTelemetry.Instrumentation.ConfluentKafka.Tests.csproj", "{BE40900A-2859-471D-8802-21DFD73DDAA7}" EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "kafka", "kafka", "{3A464E7A-42F3-44B0-B8D7-80521A7704A6}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Examples.ConfluentKafka", "examples\kafka\Examples.ConfluentKafka.csproj", "{9B994669-E839-4C42-A0F1-DF9DD058C1DC}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Examples.ConfluentKafka", "examples\kafka\Examples.ConfluentKafka.csproj", "{9B994669-E839-4C42-A0F1-DF9DD058C1DC}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution From a4fc992e507bea855489b7034b8129a8ec0c6120 Mon Sep 17 00:00:00 2001 From: Mikel Blanchard Date: Tue, 25 Jun 2024 10:45:16 -0700 Subject: [PATCH 4/9] Code review. --- .../ConfluentKafkaCommon.cs | 7 ++ .../ConsumerMeterInstrumentation.cs | 36 ---------- .../InstrumentedConsumer.cs | 53 ++++++++------- .../InstrumentedProducer.cs | 66 ++++++++++--------- .../ProducerMeterInstrumentation.cs | 36 ---------- 5 files changed, 72 insertions(+), 126 deletions(-) delete mode 100644 src/OpenTelemetry.Instrumentation.ConfluentKafka/ConsumerMeterInstrumentation.cs delete mode 100644 src/OpenTelemetry.Instrumentation.ConfluentKafka/ProducerMeterInstrumentation.cs diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaCommon.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaCommon.cs index 2a8b86016e..beb4c4812d 100644 --- a/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaCommon.cs +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConfluentKafkaCommon.cs @@ -2,7 +2,9 @@ // SPDX-License-Identifier: Apache-2.0 using System.Diagnostics; +using System.Diagnostics.Metrics; using OpenTelemetry.Internal; +using OpenTelemetry.Trace; namespace OpenTelemetry.Instrumentation.ConfluentKafka; @@ -11,4 +13,9 @@ internal static class ConfluentKafkaCommon internal static readonly string InstrumentationName = typeof(ConfluentKafkaCommon).Assembly.GetName().Name!; internal static readonly string InstrumentationVersion = typeof(ConfluentKafkaCommon).Assembly.GetPackageVersion(); internal static readonly ActivitySource ActivitySource = new(InstrumentationName, InstrumentationVersion); + internal static readonly Meter Meter = new(InstrumentationName, InstrumentationVersion); + internal static readonly Counter ReceiveMessagesCounter = Meter.CreateCounter(SemanticConventions.MetricMessagingReceiveMessages); + internal static readonly Histogram ReceiveDurationHistogram = Meter.CreateHistogram(SemanticConventions.MetricMessagingReceiveDuration); + internal static readonly Counter PublishMessagesCounter = Meter.CreateCounter(SemanticConventions.MetricMessagingPublishMessages); + internal static readonly Histogram PublishDurationHistogram = Meter.CreateHistogram(SemanticConventions.MetricMessagingPublishDuration); } diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConsumerMeterInstrumentation.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConsumerMeterInstrumentation.cs deleted file mode 100644 index 912fdde9f1..0000000000 --- a/src/OpenTelemetry.Instrumentation.ConfluentKafka/ConsumerMeterInstrumentation.cs +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -using System.Diagnostics.Metrics; -using OpenTelemetry.Trace; - -namespace OpenTelemetry.Instrumentation.ConfluentKafka; - -internal sealed class ConsumerMeterInstrumentation : IDisposable -{ - private readonly Meter meter; - private readonly Counter receiveMessagesCounter; - private readonly Histogram receiveDurationHistogram; - - public ConsumerMeterInstrumentation() - { - this.meter = new Meter(ConfluentKafkaCommon.InstrumentationName, ConfluentKafkaCommon.InstrumentationVersion); - this.receiveMessagesCounter = this.meter.CreateCounter(SemanticConventions.MetricMessagingReceiveMessages); - this.receiveDurationHistogram = this.meter.CreateHistogram(SemanticConventions.MetricMessagingReceiveDuration); - } - - public void RecordReceivedMessage(ReadOnlySpan> tags) - { - this.receiveMessagesCounter.Add(1, tags); - } - - public void RecordReceiveDuration(double duration, ReadOnlySpan> tags) - { - this.receiveDurationHistogram.Record(duration, tags); - } - - public void Dispose() - { - this.meter.Dispose(); - } -} diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedConsumer.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedConsumer.cs index 387075e9b4..8fa5fa8b02 100644 --- a/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedConsumer.cs +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedConsumer.cs @@ -13,7 +13,6 @@ internal class InstrumentedConsumer : IConsumer { private const string ReceiveOperationName = "receive"; private const string KafkaMessagingSystem = "kafka"; - private readonly ConsumerMeterInstrumentation consumerMeterInstrumentation = new(); private readonly IConsumer consumer; private readonly ConfluentKafkaConsumerInstrumentationOptions options; @@ -39,7 +38,6 @@ public InstrumentedConsumer(IConsumer consumer, ConfluentKafkaCons public void Dispose() { - this.consumerMeterInstrumentation.Dispose(); this.consumer.Dispose(); } @@ -292,37 +290,44 @@ private static IEnumerable ExtractTraceContext(Headers? headers, string _ => (new ConsumeResult(exception.ConsumerRecord.TopicPartitionOffset, exception.ConsumerRecord.Message.Headers, exception.ConsumerRecord.Message.Key), FormatConsumeException(exception)), }; - private static IEnumerable> GetTags(string topic, int? partition = null, string? errorType = null) - { - yield return new KeyValuePair( - SemanticConventions.AttributeMessagingOperation, - ReceiveOperationName); - yield return new KeyValuePair( - SemanticConventions.AttributeMessagingSystem, - KafkaMessagingSystem); - yield return new KeyValuePair( - SemanticConventions.AttributeMessagingDestinationName, - topic); + private static void GetTags(string topic, out TagList tags, int? partition = null, string? errorType = null) + { + tags = new TagList() + { + new KeyValuePair( + SemanticConventions.AttributeMessagingOperation, + ReceiveOperationName), + new KeyValuePair( + SemanticConventions.AttributeMessagingSystem, + KafkaMessagingSystem), + new KeyValuePair( + SemanticConventions.AttributeMessagingDestinationName, + topic), + }; + if (partition is not null) { - yield return new KeyValuePair( - SemanticConventions.AttributeMessagingKafkaDestinationPartition, - partition); + tags.Add( + new KeyValuePair( + SemanticConventions.AttributeMessagingKafkaDestinationPartition, + partition)); } if (errorType is not null) { - yield return new KeyValuePair( - SemanticConventions.AttributeErrorType, - errorType); + tags.Add( + new KeyValuePair( + SemanticConventions.AttributeErrorType, + errorType)); } } - private void RecordReceive(TopicPartition topicPartition, TimeSpan duration, string? errorType = null) + private static void RecordReceive(TopicPartition topicPartition, TimeSpan duration, string? errorType = null) { - var tags = GetTags(topicPartition.Topic, partition: topicPartition.Partition, errorType).ToArray(); - this.consumerMeterInstrumentation.RecordReceivedMessage(tags); - this.consumerMeterInstrumentation.RecordReceiveDuration(duration.TotalSeconds, tags); + GetTags(topicPartition.Topic, out var tags, partition: topicPartition.Partition, errorType); + + ConfluentKafkaCommon.ReceiveMessagesCounter.Add(1, in tags); + ConfluentKafkaCommon.ReceiveDurationHistogram.Record(duration.TotalSeconds, in tags); } private void InstrumentConsumption(DateTimeOffset startTime, DateTimeOffset endTime, ConsumeResult consumeResult, string? errorType) @@ -352,7 +357,7 @@ private void InstrumentConsumption(DateTimeOffset startTime, DateTimeOffset endT if (this.options.Metrics) { TimeSpan duration = endTime - startTime; - this.RecordReceive(consumeResult.TopicPartitionOffset!.TopicPartition, duration, errorType); + RecordReceive(consumeResult.TopicPartitionOffset!.TopicPartition, duration, errorType); } } diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedProducer.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedProducer.cs index 3df5553eb2..aa7d31ba48 100644 --- a/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedProducer.cs +++ b/src/OpenTelemetry.Instrumentation.ConfluentKafka/InstrumentedProducer.cs @@ -15,7 +15,6 @@ internal sealed class InstrumentedProducer : IProducer producer; private readonly ConfluentKafkaProducerInstrumentationOptions options; @@ -83,7 +82,7 @@ public async Task> ProduceAsync( if (this.options.Metrics) { - this.RecordPublish(topic, duration, errorType); + RecordPublish(topic, duration, errorType); } } @@ -130,7 +129,7 @@ public async Task> ProduceAsync( if (this.options.Metrics) { - this.RecordPublish(topicPartition, duration, errorType); + RecordPublish(topicPartition, duration, errorType); } } @@ -173,7 +172,7 @@ public void Produce(string topic, Message message, Action message if (this.options.Metrics) { - this.RecordPublish(topicPartition, duration, errorType); + RecordPublish(topicPartition, duration, errorType); } } } @@ -271,7 +270,6 @@ public void SendOffsetsToTransaction(IEnumerable offsets, public void Dispose() { - this.producerMeterInstrumentation.Dispose(); this.producer.Dispose(); } @@ -281,44 +279,52 @@ private static string FormatProduceException(ProduceException prod private static string FormatArgumentException(ArgumentException argumentException) => $"ArgumentException: {argumentException.ParamName}"; - private static IEnumerable> GetTags(string topic, int? partition = null, string? errorType = null) + private static void GetTags(string topic, out TagList tags, int? partition = null, string? errorType = null) { - yield return new KeyValuePair( - SemanticConventions.AttributeMessagingOperation, - PublishOperationName); - yield return new KeyValuePair( - SemanticConventions.AttributeMessagingSystem, - KafkaMessagingSystem); - yield return new KeyValuePair( - SemanticConventions.AttributeMessagingDestinationName, - topic); + tags = new TagList() + { + new KeyValuePair( + SemanticConventions.AttributeMessagingOperation, + PublishOperationName), + new KeyValuePair( + SemanticConventions.AttributeMessagingSystem, + KafkaMessagingSystem), + new KeyValuePair( + SemanticConventions.AttributeMessagingDestinationName, + topic), + }; + if (partition is not null) { - yield return new KeyValuePair( - SemanticConventions.AttributeMessagingKafkaDestinationPartition, - partition); + tags.Add( + new KeyValuePair( + SemanticConventions.AttributeMessagingKafkaDestinationPartition, + partition)); } if (errorType is not null) { - yield return new KeyValuePair( - SemanticConventions.AttributeErrorType, - errorType); + tags.Add( + new KeyValuePair( + SemanticConventions.AttributeErrorType, + errorType)); } } - private void RecordPublish(string topic, TimeSpan duration, string? errorType = null) + private static void RecordPublish(string topic, TimeSpan duration, string? errorType = null) { - var tags = GetTags(topic, partition: null, errorType).ToArray(); - this.producerMeterInstrumentation.RecordPublishMessage(tags); - this.producerMeterInstrumentation.RecordPublishDuration(duration.TotalSeconds, tags); + GetTags(topic, out var tags, partition: null, errorType); + + ConfluentKafkaCommon.PublishMessagesCounter.Add(1, in tags); + ConfluentKafkaCommon.PublishDurationHistogram.Record(duration.TotalSeconds, in tags); } - private void RecordPublish(TopicPartition topicPartition, TimeSpan duration, string? errorType = null) + private static void RecordPublish(TopicPartition topicPartition, TimeSpan duration, string? errorType = null) { - var tags = GetTags(topicPartition.Topic, partition: topicPartition.Partition, errorType).ToArray(); - this.producerMeterInstrumentation.RecordPublishMessage(tags); - this.producerMeterInstrumentation.RecordPublishDuration(duration.TotalSeconds, tags); + GetTags(topicPartition.Topic, out var tags, partition: topicPartition.Partition, errorType); + + ConfluentKafkaCommon.PublishMessagesCounter.Add(1, in tags); + ConfluentKafkaCommon.PublishDurationHistogram.Record(duration.TotalSeconds, in tags); } private Activity? StartPublishActivity(DateTimeOffset start, string topic, Message message, int? partition = null) diff --git a/src/OpenTelemetry.Instrumentation.ConfluentKafka/ProducerMeterInstrumentation.cs b/src/OpenTelemetry.Instrumentation.ConfluentKafka/ProducerMeterInstrumentation.cs deleted file mode 100644 index 4288b92fa2..0000000000 --- a/src/OpenTelemetry.Instrumentation.ConfluentKafka/ProducerMeterInstrumentation.cs +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -using System.Diagnostics.Metrics; -using OpenTelemetry.Trace; - -namespace OpenTelemetry.Instrumentation.ConfluentKafka; - -internal sealed class ProducerMeterInstrumentation : IDisposable -{ - private readonly Meter meter; - private readonly Counter publishMessagesCounter; - private readonly Histogram publishDurationHistogram; - - public ProducerMeterInstrumentation() - { - this.meter = new Meter(ConfluentKafkaCommon.InstrumentationName, ConfluentKafkaCommon.InstrumentationVersion); - this.publishMessagesCounter = this.meter.CreateCounter(SemanticConventions.MetricMessagingPublishMessages); - this.publishDurationHistogram = this.meter.CreateHistogram(SemanticConventions.MetricMessagingPublishDuration); - } - - public void RecordPublishMessage(ReadOnlySpan> tags) - { - this.publishMessagesCounter.Add(1, tags); - } - - public void RecordPublishDuration(double duration, ReadOnlySpan> tags) - { - this.publishDurationHistogram.Record(duration, tags); - } - - public void Dispose() - { - this.meter.Dispose(); - } -} From 400fdc15ccc3521bac45b6fa64f873dd402c77b9 Mon Sep 17 00:00:00 2001 From: Mikel Blanchard Date: Tue, 25 Jun 2024 11:02:01 -0700 Subject: [PATCH 5/9] Add integration tests to CI. --- .github/workflows/ci.yml | 14 +++++++++++++- .github/workflows/integration.yml | 20 +++++++++++++++++++- 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index dac9fe22d8..1f0778213e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -232,7 +232,17 @@ jobs: uses: ./.github/workflows/Component.BuildTest.yml with: project-name: Component[OpenTelemetry.Instrumentation.ConfluentKafka] - code-cov-name: Instrumentation.ConfluentKafka + code-cov-name: Instrumentation.ConfluentKafka + + build-test-instrumentation-confluentkafka-integration: + needs: detect-changes + if: | + contains(needs.detect-changes.outputs.changes, 'instrumentation-confluentkafka') + || contains(needs.detect-changes.outputs.changes, 'build') + || contains(needs.detect-changes.outputs.changes, 'shared') + uses: ./.github/workflows/integration.yml + with: + job: kafka-integration-test build-test-instrumentation-elasticsearchclient: needs: detect-changes @@ -387,6 +397,8 @@ jobs: || contains(needs.detect-changes.outputs.changes, 'build') || contains(needs.detect-changes.outputs.changes, 'shared') uses: ./.github/workflows/integration.yml + with: + job: redis-integration-test build-test-instrumentation-wcf: needs: detect-changes diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index 17fc8a9da9..d139ad213b 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -2,9 +2,14 @@ name: Integration Build OpenTelemetry.Instrumentation.StackExchangeRedis on: workflow_call: + inputs: + job: + required: true + type: string jobs: redis-integration-test: + if: inputs.job == 'all' || inputs.job == 'redis-integration-test' runs-on: ubuntu-latest strategy: fail-fast: false @@ -13,5 +18,18 @@ jobs: steps: - uses: actions/checkout@v4 - - name: Run redis docker-compose.integration + - name: Run redis docker-compose run: docker-compose --file=test/OpenTelemetry.Instrumentation.StackExchangeRedis.Tests/docker-compose.yml --file=build/docker-compose.${{ matrix.version }}.yml --project-directory=. up --exit-code-from=tests --build + + kafka-integration-test: + if: inputs.job == 'all' || inputs.job == 'kafka-integration-test' + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + version: [net6.0, net7.0, net8.0] + steps: + - uses: actions/checkout@v4 + + - name: Run kafka docker-compose + run: docker-compose --file=test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/docker-compose.yml --file=build/docker-compose.${{ matrix.version }}.yml --project-directory=. up --exit-code-from=tests --build From cdd2470cf59ee5fb12049ef2acf038b0f3a44040 Mon Sep 17 00:00:00 2001 From: Mikel Blanchard Date: Tue, 25 Jun 2024 11:05:11 -0700 Subject: [PATCH 6/9] Lint. --- examples/kafka/README.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/examples/kafka/README.md b/examples/kafka/README.md index b4349e238c..02f594480e 100644 --- a/examples/kafka/README.md +++ b/examples/kafka/README.md @@ -1,4 +1,4 @@ -# Run Examples.ConfluentKafka +# Run Examples.ConfluentKafka Start the Confluent Kafka stack: @@ -11,4 +11,3 @@ Start the Aspire Dashboard: ```cmd docker run --rm -it -p 18888:18888 -p 4317:18889 -d --name aspire-dashboard mcr.microsoft.com/dotnet/nightly/aspire-dashboard:8.0.0 ``` - From 5afe9e052ed64fe50e4c1bace90c43c977a5d47e Mon Sep 17 00:00:00 2001 From: g7ed6e Date: Wed, 26 Jun 2024 09:55:57 +0200 Subject: [PATCH 7/9] Fix integration tests --- .../HostedMeteringTests.cs | 10 +--------- .../HostedTracingAndMeteringTests.cs | 10 +--------- 2 files changed, 2 insertions(+), 18 deletions(-) diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedMeteringTests.cs b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedMeteringTests.cs index 0755ec41de..4dab3a3ebb 100644 --- a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedMeteringTests.cs +++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedMeteringTests.cs @@ -70,14 +70,6 @@ public async Task ResolveInstrumentedBuildersFromHostServiceProviderTest() await producer.FlushAsync(); } - host.Services.GetRequiredService().EnsureMetricsAreFlushed(); - - groups = metrics.GroupBy(x => x.Name).ToArray(); - - Assert.Equal(2, groups.Length); - - metrics.Clear(); - using (var consumer = host.Services.GetRequiredService>().Build()) { consumer.Subscribe(topic); @@ -108,6 +100,6 @@ public async Task ResolveInstrumentedBuildersFromHostServiceProviderTest() groups = metrics.GroupBy(x => x.Name).ToArray(); - Assert.Equal(2, groups.Length); + Assert.Equal(4, groups.Length); } } diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedTracingAndMeteringTests.cs b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedTracingAndMeteringTests.cs index 1d7724e873..06ec8d08eb 100644 --- a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedTracingAndMeteringTests.cs +++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedTracingAndMeteringTests.cs @@ -80,14 +80,6 @@ public async Task ResolveInstrumentedBuildersFromHostServiceProviderTest() await producer.FlushAsync(); } - host.Services.GetRequiredService().EnsureMetricsAreFlushed(); - - groups = metrics.GroupBy(x => x.Name).ToArray(); - - Assert.Equal(2, groups.Length); - - metrics.Clear(); - using (var consumer = host.Services.GetRequiredService>().Build()) { consumer.Subscribe(topic); @@ -122,6 +114,6 @@ public async Task ResolveInstrumentedBuildersFromHostServiceProviderTest() groups = metrics.GroupBy(x => x.Name).ToArray(); - Assert.Equal(2, groups.Length); + Assert.Equal(4, groups.Length); } } From 4256bb1fba061e0f98a720aa7ace9f3a0e794fe8 Mon Sep 17 00:00:00 2001 From: Mikel Blanchard Date: Wed, 26 Jun 2024 10:05:11 -0700 Subject: [PATCH 8/9] Lint. --- .../HostedMeteringTests.cs | 4 ---- .../HostedTracingAndMeteringTests.cs | 4 ---- .../HostedTracingTests.cs | 3 --- .../KafkaHelpers.cs | 2 -- .../MeteringTests.cs | 4 ---- .../ProducerExtensions.cs | 2 -- .../TracingTests.cs | 3 --- 7 files changed, 22 deletions(-) diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedMeteringTests.cs b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedMeteringTests.cs index 4dab3a3ebb..f9f188f343 100644 --- a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedMeteringTests.cs +++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedMeteringTests.cs @@ -1,10 +1,6 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading.Tasks; using Confluent.Kafka; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedTracingAndMeteringTests.cs b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedTracingAndMeteringTests.cs index 06ec8d08eb..a2ed848d1c 100644 --- a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedTracingAndMeteringTests.cs +++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedTracingAndMeteringTests.cs @@ -1,11 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -using System; -using System.Collections.Generic; using System.Diagnostics; -using System.Linq; -using System.Threading.Tasks; using Confluent.Kafka; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedTracingTests.cs b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedTracingTests.cs index 4bc03e4991..d47d93d14f 100644 --- a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedTracingTests.cs +++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/HostedTracingTests.cs @@ -1,10 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -using System; -using System.Collections.Generic; using System.Diagnostics; -using System.Threading.Tasks; using Confluent.Kafka; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/KafkaHelpers.cs b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/KafkaHelpers.cs index 2c375e0948..793753bdc2 100644 --- a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/KafkaHelpers.cs +++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/KafkaHelpers.cs @@ -1,8 +1,6 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -using System; -using System.Threading.Tasks; using Confluent.Kafka; using OpenTelemetry.Tests; diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/MeteringTests.cs b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/MeteringTests.cs index f3843ab3c9..eff25507ff 100644 --- a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/MeteringTests.cs +++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/MeteringTests.cs @@ -1,10 +1,6 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading.Tasks; using Confluent.Kafka; using OpenTelemetry.Metrics; using OpenTelemetry.Tests; diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/ProducerExtensions.cs b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/ProducerExtensions.cs index 9066260da0..01168f5fe4 100644 --- a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/ProducerExtensions.cs +++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/ProducerExtensions.cs @@ -1,8 +1,6 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -using System; -using System.Threading.Tasks; using Confluent.Kafka; namespace OpenTelemetry.Instrumentation.ConfluentKafka.Tests; diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/TracingTests.cs b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/TracingTests.cs index 7d322862d7..923b4dae99 100644 --- a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/TracingTests.cs +++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/TracingTests.cs @@ -1,10 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -using System; -using System.Collections.Generic; using System.Diagnostics; -using System.Threading.Tasks; using Confluent.Kafka; using OpenTelemetry.Tests; using OpenTelemetry.Trace; From b429a6f44ea1601a6a4a9dedc123053b33b462e0 Mon Sep 17 00:00:00 2001 From: g7ed6e Date: Thu, 27 Jun 2024 10:35:30 +0200 Subject: [PATCH 9/9] Fix docker-compose CI integration tests --- .../docker-compose.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/docker-compose.yml b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/docker-compose.yml index 4761a9f0bb..c749ab3632 100644 --- a/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/docker-compose.yml +++ b/test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/docker-compose.yml @@ -8,10 +8,10 @@ services: image: confluentinc/confluent-local environment: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT - KAFKA_LISTENERS: PLAINTEXT://localhost:29092,CONTROLLER://localhost:29093,PLAINTEXT_HOST://0.0.0.0:9092,PLAINTEXT_INTERNAL://kafka:9092 - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092,PLAINTEXT_INTERNAL://kafka:9092,PLAINTEXT_HOST://localhost:9092 + KAFKA_LISTENERS: PLAINTEXT://localhost:29092,CONTROLLER://localhost:29093,PLAINTEXT_HOST://0.0.0.0:9092,PLAINTEXT_INTERNAL://kafka:9093 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092,PLAINTEXT_INTERNAL://kafka:9093,PLAINTEXT_HOST://localhost:9092 ports: - - "9092:9092" + - "9093:9093" tests: build: @@ -19,6 +19,6 @@ services: dockerfile: ./test/OpenTelemetry.Instrumentation.ConfluentKafka.Tests/Dockerfile command: --TestCaseFilter:CategoryName=KafkaIntegrationTests environment: - - OTEL_KAFKAENDPOINT=kafka:9092 + - OTEL_KAFKAENDPOINT=kafka:9093 depends_on: - kafka