diff --git a/src/OpenTelemetry.Sampler.AWS/AWSXRayRemoteSampler.cs b/src/OpenTelemetry.Sampler.AWS/AWSXRayRemoteSampler.cs index f5652225f7..3e28f32868 100644 --- a/src/OpenTelemetry.Sampler.AWS/AWSXRayRemoteSampler.cs +++ b/src/OpenTelemetry.Sampler.AWS/AWSXRayRemoteSampler.cs @@ -14,8 +14,13 @@ public sealed class AWSXRayRemoteSampler : Trace.Sampler, IDisposable { internal static readonly TimeSpan DefaultTargetInterval = TimeSpan.FromSeconds(10); + private const string ClientIdCharacters = "0123456789abcdef"; + private static readonly Random Random = new(); + private readonly CancellationTokenSource cancellationTokenSource = new(); + private readonly SemaphoreSlim pollerLock = new(1, 1); private bool isFallBackEventToWriteSwitch = true; + private int disposed; [SuppressMessage("Performance", "CA5394: Do not use insecure randomness", Justification = "Secure random is not required for jitters.")] internal AWSXRayRemoteSampler(Resource resource, TimeSpan pollingInterval, string endpoint, Clock clock) @@ -73,10 +78,7 @@ internal AWSXRayRemoteSampler(Resource resource, TimeSpan pollingInterval, strin /// to identify the service attributes for sampling. This resource should /// be the same as what the OpenTelemetry SDK is configured with. /// an instance of . - public static AWSXRayRemoteSamplerBuilder Builder(Resource resource) - { - return new AWSXRayRemoteSamplerBuilder(resource); - } + public static AWSXRayRemoteSamplerBuilder Builder(Resource resource) => new(resource); /// public override SamplingResult ShouldSample(in SamplingParameters samplingParameters) @@ -105,53 +107,54 @@ public void Dispose() GC.SuppressFinalize(this); } - [SuppressMessage( - "Usage", - "CA5394: Do not use insecure randomness", - Justification = "using insecure random is fine here since clientId doesn't need to be secure.")] - private static string GenerateClientId() + internal async Task ExecutePollAsync(Func pollAsync) { - char[] hex = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f']; - var clientIdChars = new char[24]; - for (var i = 0; i < clientIdChars.Length; i++) + var lockTaken = false; + + try { - clientIdChars[i] = hex[Random.Next(hex.Length)]; - } + await this.pollerLock.WaitAsync(this.cancellationTokenSource.Token).ConfigureAwait(false); + lockTaken = true; - return new string(clientIdChars); - } + if (Volatile.Read(ref this.disposed) != 0) + { + return; + } - private void Dispose(bool disposing) - { - if (disposing) + await pollAsync(this.cancellationTokenSource.Token).ConfigureAwait(false); + } + catch (OperationCanceledException) when (this.cancellationTokenSource.IsCancellationRequested) { - this.RulePollerTimer?.Dispose(); - this.Client?.Dispose(); - this.RulesCache?.Dispose(); + // Sampler is shutting down. + } + catch (ObjectDisposedException) when (Volatile.Read(ref this.disposed) != 0) + { + // Sampler is shutting down. + } + catch (Exception ex) + { + AWSSamplerEventSource.Log.ExceptionFromSampler(ex.Message); + } + finally + { + if (lockTaken) + { + this.pollerLock.Release(); + } } } - private async void GetAndUpdateRules(object? state) - { - var rules = await this.Client.GetSamplingRules().ConfigureAwait(false); - - this.RulesCache.UpdateRules(rules); - - // schedule the next rule poll. - this.RulePollerTimer.Change(this.PollingInterval.Add(this.RulePollerJitter), Timeout.InfiniteTimeSpan); - } - - private async void GetAndUpdateTargets(object? state) + internal async Task GetAndUpdateTargetsAsync(CancellationToken cancellationToken) { - await this.GetAndUpdateTargetsAsync().ConfigureAwait(false); - } + cancellationToken.ThrowIfCancellationRequested(); - private async Task GetAndUpdateTargetsAsync() - { var statistics = this.RulesCache.Snapshot(this.Clock.Now()); var request = new GetSamplingTargetsRequest(statistics); - var response = await this.Client.GetSamplingTargets(request).ConfigureAwait(false); + var response = await this.Client.GetSamplingTargets(request, cancellationToken).ConfigureAwait(false); + + cancellationToken.ThrowIfCancellationRequested(); + if (response != null) { Dictionary targets = []; @@ -169,7 +172,8 @@ private async Task GetAndUpdateTargetsAsync() { var lastRuleModificationTime = this.Clock.ToDateTime(response.LastRuleModification); - if (lastRuleModificationTime > this.RulesCache.GetUpdatedAt()) + if (!cancellationToken.IsCancellationRequested && + lastRuleModificationTime > this.RulesCache.GetUpdatedAt()) { // rules have been updated. fetch the new ones right away. this.RulePollerTimer.Change(TimeSpan.Zero, Timeout.InfiniteTimeSpan); @@ -185,6 +189,100 @@ private async Task GetAndUpdateTargetsAsync() nextTargetFetchInterval = DefaultTargetInterval; } - this.TargetPollerTimer.Change(nextTargetFetchInterval.Add(this.TargetPollerJitter), Timeout.InfiniteTimeSpan); + if (!cancellationToken.IsCancellationRequested) + { + this.TargetPollerTimer.Change(nextTargetFetchInterval.Add(this.TargetPollerJitter), Timeout.InfiniteTimeSpan); + } + } + + [SuppressMessage( + "Usage", + "CA5394: Do not use insecure randomness", + Justification = "using insecure random is fine here since clientId doesn't need to be secure.")] + private static string GenerateClientId() + { + const int ClientIdLength = 24; + +#if NET + Span buffer = stackalloc char[ClientIdLength]; + + Random.GetItems(ClientIdCharacters, buffer); + + return new(buffer); +#else + var buffer = new char[ClientIdLength]; + for (var i = 0; i < buffer.Length; i++) + { + buffer[i] = ClientIdCharacters[Random.Next(ClientIdCharacters.Length)]; + } + + return new string(buffer); +#endif + } + + private static void DisposeTimer(Timer? timer) + { + if (timer == null) + { + return; + } + + using var disposedEvent = new ManualResetEvent(false); + if (timer.Dispose(disposedEvent)) + { + disposedEvent.WaitOne(); + } + } + + private void Dispose(bool disposing) + { + if (disposing) + { + if (Interlocked.Exchange(ref this.disposed, 1) != 0) + { + return; + } + + this.cancellationTokenSource.Cancel(); + DisposeTimer(this.RulePollerTimer); + DisposeTimer(this.TargetPollerTimer); + + this.pollerLock.Wait(); + + try + { + this.Client?.Dispose(); + this.RulesCache?.Dispose(); + } + finally + { + this.pollerLock.Release(); + this.pollerLock.Dispose(); + this.cancellationTokenSource.Dispose(); + } + } + } + + private void GetAndUpdateRules(object? state) => + _ = this.ExecutePollAsync(this.GetAndUpdateRulesAsync); + + private void GetAndUpdateTargets(object? state) => + _ = this.ExecutePollAsync(this.GetAndUpdateTargetsAsync); + + private async Task GetAndUpdateRulesAsync(CancellationToken cancellationToken) + { + cancellationToken.ThrowIfCancellationRequested(); + + var rules = await this.Client.GetSamplingRules(cancellationToken).ConfigureAwait(false); + + cancellationToken.ThrowIfCancellationRequested(); + + this.RulesCache.UpdateRules(rules); + + if (!cancellationToken.IsCancellationRequested) + { + // schedule the next rule poll. + this.RulePollerTimer.Change(this.PollingInterval.Add(this.RulePollerJitter), Timeout.InfiniteTimeSpan); + } } } diff --git a/test/OpenTelemetry.Instrumentation.AspNetCore.Tests/BasicTests.cs b/test/OpenTelemetry.Instrumentation.AspNetCore.Tests/BasicTests.cs index 194358052c..4a3de6b062 100644 --- a/test/OpenTelemetry.Instrumentation.AspNetCore.Tests/BasicTests.cs +++ b/test/OpenTelemetry.Instrumentation.AspNetCore.Tests/BasicTests.cs @@ -1326,7 +1326,7 @@ private static void WaitForActivityExport(List exportedItems, int coun Thread.Sleep(10); return exportedItems.Count >= count; }, - TimeSpan.FromSeconds(1)), + TimeSpan.FromSeconds(5)), $"Actual: {exportedItems.Count} Expected: {count}"); private static void ValidateAspNetCoreActivity(Activity activityToValidate, string expectedHttpPath) diff --git a/test/OpenTelemetry.Sampler.AWS.Tests/TestAWSXRayRemoteSampler.cs b/test/OpenTelemetry.Sampler.AWS.Tests/TestAWSXRayRemoteSampler.cs index 4c2a7e3a2f..09b6c1c244 100644 --- a/test/OpenTelemetry.Sampler.AWS.Tests/TestAWSXRayRemoteSampler.cs +++ b/test/OpenTelemetry.Sampler.AWS.Tests/TestAWSXRayRemoteSampler.cs @@ -22,14 +22,12 @@ public void TestSamplerWithConfiguration() .SetEndpoint(endpoint) .Build(); - var rootSamplerFieldInfo = typeof(ParentBasedSampler).GetField("rootSampler", BindingFlags.NonPublic | BindingFlags.Instance); - - var xraySampler = (AWSXRayRemoteSampler?)rootSamplerFieldInfo?.GetValue(parentBasedSampler); + using var xraySampler = GetRemoteSampler(parentBasedSampler); - Assert.Equal(pollingInterval, xraySampler?.PollingInterval); - Assert.Equal(endpoint, xraySampler?.Endpoint); - Assert.NotNull(xraySampler?.RulePollerTimer); - Assert.NotNull(xraySampler?.Client); + Assert.Equal(pollingInterval, xraySampler.PollingInterval); + Assert.Equal(endpoint, xraySampler.Endpoint); + Assert.NotNull(xraySampler.RulePollerTimer); + Assert.NotNull(xraySampler.Client); } [Fact] @@ -37,14 +35,12 @@ public void TestSamplerWithDefaults() { var parentBasedSampler = AWSXRayRemoteSampler.Builder(ResourceBuilder.CreateEmpty().Build()).Build(); - var rootSamplerFieldInfo = typeof(ParentBasedSampler).GetField("rootSampler", BindingFlags.NonPublic | BindingFlags.Instance); - - var xraySampler = (AWSXRayRemoteSampler?)rootSamplerFieldInfo?.GetValue(parentBasedSampler); + using var xraySampler = GetRemoteSampler(parentBasedSampler); - Assert.Equal(TimeSpan.FromMinutes(5), xraySampler?.PollingInterval); - Assert.Equal("http://localhost:2000", xraySampler?.Endpoint); - Assert.NotNull(xraySampler?.RulePollerTimer); - Assert.NotNull(xraySampler?.Client); + Assert.Equal(TimeSpan.FromMinutes(5), xraySampler.PollingInterval); + Assert.Equal("http://localhost:2000", xraySampler.Endpoint); + Assert.NotNull(xraySampler.RulePollerTimer); + Assert.NotNull(xraySampler.Client); } [Fact] @@ -66,6 +62,8 @@ public async Task TestSamplerUpdateAndSample() .SetClock(clock) .Build(); + using var remoteSampler = GetRemoteSampler(sampler); + // the sampler will use fallback sampler until rules are fetched. Assert.Equal(SamplingDecision.RecordAndSample, this.DoSample(sampler, "cat-service")); @@ -122,26 +120,48 @@ public async Task TestSamplerUpdateTargetsWithMissingTargetDocumentsDoesNotThrow .SetClock(clock) .Build(); - var rootSamplerFieldInfo = typeof(ParentBasedSampler).GetField("rootSampler", BindingFlags.NonPublic | BindingFlags.Instance); - var sampler = (AWSXRayRemoteSampler?)rootSamplerFieldInfo?.GetValue(parentBasedSampler); - - Assert.NotNull(sampler); + using var sampler = GetRemoteSampler(parentBasedSampler); requestHandler.SetResponse("/SamplingTargets", "{\"LastRuleModification\":1530920505.0}"); - var getAndUpdateTargetsAsyncMethod = typeof(AWSXRayRemoteSampler).GetMethod("GetAndUpdateTargetsAsync", BindingFlags.NonPublic | BindingFlags.Instance); - var getAndUpdateTargetsAsyncTask = (Task?)getAndUpdateTargetsAsyncMethod?.Invoke(sampler, null); + await sampler.GetAndUpdateTargetsAsync(CancellationToken.None); + } - Assert.NotNull(getAndUpdateTargetsAsyncTask); + [Fact] + public async Task ExecutePollAsyncDoesNotBlockCaller() + { + using var sampler = GetRemoteSampler(AWSXRayRemoteSampler.Builder(ResourceBuilder.CreateEmpty().Build()).Build()); - try - { - await getAndUpdateTargetsAsyncTask!; - } - finally + var pollStarted = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var releasePoll = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var executePollAsyncMethod = typeof(AWSXRayRemoteSampler).GetMethod("ExecutePollAsync", BindingFlags.NonPublic | BindingFlags.Instance); + + Assert.NotNull(executePollAsyncMethod); + + Task PollAsync(CancellationToken cancellationToken) { - sampler.Dispose(); + pollStarted.TrySetResult(true); + cancellationToken.Register(() => releasePoll.TrySetCanceled(cancellationToken)); + return releasePoll.Task; } + + var stopwatch = Stopwatch.StartNew(); + var executePollTask = sampler.ExecutePollAsync(PollAsync); + stopwatch.Stop(); + + await pollStarted.Task; + Assert.True(stopwatch.Elapsed < TimeSpan.FromSeconds(1), $"Expected ExecutePollAsync to return without waiting for the poll to finish, but it took {stopwatch.Elapsed}."); + + releasePoll.TrySetResult(true); + await executePollTask; + } + + private static AWSXRayRemoteSampler GetRemoteSampler(Trace.Sampler sampler) + { + var rootSamplerFieldInfo = typeof(ParentBasedSampler).GetField("rootSampler", BindingFlags.NonPublic | BindingFlags.Instance); + var remoteSampler = (AWSXRayRemoteSampler?)rootSamplerFieldInfo?.GetValue(sampler); + + return remoteSampler ?? throw new InvalidOperationException("Unable to get AWSXRayRemoteSampler from ParentBasedSampler."); } private SamplingDecision DoSample(Trace.Sampler sampler, string serviceName) diff --git a/test/Shared/TestHttpServer.cs b/test/Shared/TestHttpServer.cs index 3ace5275b0..333ea62520 100644 --- a/test/Shared/TestHttpServer.cs +++ b/test/Shared/TestHttpServer.cs @@ -105,7 +105,7 @@ private static bool IsResponseAlreadyClosedException(Exception exception) return true; } - if (ex is HttpListenerException httpEx && (httpEx.ErrorCode is 1 or 6 or 995 or 10057)) + if (ex is HttpListenerException httpEx && (httpEx.ErrorCode is 1 or 6 or 995 or 1229 or 10057)) { return true; }