Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 138 additions & 40 deletions src/OpenTelemetry.Sampler.AWS/AWSXRayRemoteSampler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,13 @@ public sealed class AWSXRayRemoteSampler : Trace.Sampler, IDisposable
{
internal static readonly TimeSpan DefaultTargetInterval = TimeSpan.FromSeconds(10);

private const string ClientIdCharacters = "0123456789abcdef";

private static readonly Random Random = new();
private readonly CancellationTokenSource cancellationTokenSource = new();
private readonly SemaphoreSlim pollerLock = new(1, 1);
private bool isFallBackEventToWriteSwitch = true;
private int disposed;

[SuppressMessage("Performance", "CA5394: Do not use insecure randomness", Justification = "Secure random is not required for jitters.")]
internal AWSXRayRemoteSampler(Resource resource, TimeSpan pollingInterval, string endpoint, Clock clock)
Expand Down Expand Up @@ -73,10 +78,7 @@ internal AWSXRayRemoteSampler(Resource resource, TimeSpan pollingInterval, strin
/// to identify the service attributes for sampling. This resource should
/// be the same as what the OpenTelemetry SDK is configured with.</param>
/// <returns>an instance of <see cref="AWSXRayRemoteSamplerBuilder"/>.</returns>
public static AWSXRayRemoteSamplerBuilder Builder(Resource resource)
{
return new AWSXRayRemoteSamplerBuilder(resource);
}
public static AWSXRayRemoteSamplerBuilder Builder(Resource resource) => new(resource);

/// <inheritdoc/>
public override SamplingResult ShouldSample(in SamplingParameters samplingParameters)
Expand Down Expand Up @@ -105,53 +107,54 @@ public void Dispose()
GC.SuppressFinalize(this);
}

[SuppressMessage(
"Usage",
"CA5394: Do not use insecure randomness",
Justification = "using insecure random is fine here since clientId doesn't need to be secure.")]
private static string GenerateClientId()
internal async Task ExecutePollAsync(Func<CancellationToken, Task> pollAsync)
{
char[] hex = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'];
var clientIdChars = new char[24];
for (var i = 0; i < clientIdChars.Length; i++)
var lockTaken = false;

try
{
clientIdChars[i] = hex[Random.Next(hex.Length)];
}
await this.pollerLock.WaitAsync(this.cancellationTokenSource.Token).ConfigureAwait(false);
lockTaken = true;

return new string(clientIdChars);
}
if (Volatile.Read(ref this.disposed) != 0)
{
return;
}

private void Dispose(bool disposing)
{
if (disposing)
await pollAsync(this.cancellationTokenSource.Token).ConfigureAwait(false);
}
catch (OperationCanceledException) when (this.cancellationTokenSource.IsCancellationRequested)
{
this.RulePollerTimer?.Dispose();
this.Client?.Dispose();
this.RulesCache?.Dispose();
// Sampler is shutting down.
}
catch (ObjectDisposedException) when (Volatile.Read(ref this.disposed) != 0)
{
// Sampler is shutting down.
}
catch (Exception ex)
{
AWSSamplerEventSource.Log.ExceptionFromSampler(ex.Message);
}
finally
{
if (lockTaken)
{
this.pollerLock.Release();
}
}
}

private async void GetAndUpdateRules(object? state)
{
var rules = await this.Client.GetSamplingRules().ConfigureAwait(false);

this.RulesCache.UpdateRules(rules);

// schedule the next rule poll.
this.RulePollerTimer.Change(this.PollingInterval.Add(this.RulePollerJitter), Timeout.InfiniteTimeSpan);
}

private async void GetAndUpdateTargets(object? state)
internal async Task GetAndUpdateTargetsAsync(CancellationToken cancellationToken)
{
await this.GetAndUpdateTargetsAsync().ConfigureAwait(false);
}
cancellationToken.ThrowIfCancellationRequested();

private async Task GetAndUpdateTargetsAsync()
{
var statistics = this.RulesCache.Snapshot(this.Clock.Now());

var request = new GetSamplingTargetsRequest(statistics);
var response = await this.Client.GetSamplingTargets(request).ConfigureAwait(false);
var response = await this.Client.GetSamplingTargets(request, cancellationToken).ConfigureAwait(false);

cancellationToken.ThrowIfCancellationRequested();

if (response != null)
{
Dictionary<string, SamplingTargetDocument> targets = [];
Expand All @@ -169,7 +172,8 @@ private async Task GetAndUpdateTargetsAsync()
{
var lastRuleModificationTime = this.Clock.ToDateTime(response.LastRuleModification);

if (lastRuleModificationTime > this.RulesCache.GetUpdatedAt())
if (!cancellationToken.IsCancellationRequested &&
lastRuleModificationTime > this.RulesCache.GetUpdatedAt())
{
// rules have been updated. fetch the new ones right away.
this.RulePollerTimer.Change(TimeSpan.Zero, Timeout.InfiniteTimeSpan);
Expand All @@ -185,6 +189,100 @@ private async Task GetAndUpdateTargetsAsync()
nextTargetFetchInterval = DefaultTargetInterval;
}

this.TargetPollerTimer.Change(nextTargetFetchInterval.Add(this.TargetPollerJitter), Timeout.InfiniteTimeSpan);
if (!cancellationToken.IsCancellationRequested)
{
this.TargetPollerTimer.Change(nextTargetFetchInterval.Add(this.TargetPollerJitter), Timeout.InfiniteTimeSpan);
}
}

[SuppressMessage(
"Usage",
"CA5394: Do not use insecure randomness",
Justification = "using insecure random is fine here since clientId doesn't need to be secure.")]
private static string GenerateClientId()
{
const int ClientIdLength = 24;

#if NET
Span<char> buffer = stackalloc char[ClientIdLength];

Random.GetItems(ClientIdCharacters, buffer);

return new(buffer);
#else
var buffer = new char[ClientIdLength];
for (var i = 0; i < buffer.Length; i++)
{
buffer[i] = ClientIdCharacters[Random.Next(ClientIdCharacters.Length)];
}

return new string(buffer);
#endif
}

private static void DisposeTimer(Timer? timer)
{
if (timer == null)
{
return;
}

using var disposedEvent = new ManualResetEvent(false);
if (timer.Dispose(disposedEvent))
{
disposedEvent.WaitOne();
}
}

private void Dispose(bool disposing)
{
if (disposing)
{
if (Interlocked.Exchange(ref this.disposed, 1) != 0)
{
return;
}

this.cancellationTokenSource.Cancel();
DisposeTimer(this.RulePollerTimer);
DisposeTimer(this.TargetPollerTimer);

this.pollerLock.Wait();

try
{
this.Client?.Dispose();
this.RulesCache?.Dispose();
}
finally
{
this.pollerLock.Release();
this.pollerLock.Dispose();
this.cancellationTokenSource.Dispose();
}
}
}

private void GetAndUpdateRules(object? state) =>
_ = this.ExecutePollAsync(this.GetAndUpdateRulesAsync);

private void GetAndUpdateTargets(object? state) =>
_ = this.ExecutePollAsync(this.GetAndUpdateTargetsAsync);

private async Task GetAndUpdateRulesAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

var rules = await this.Client.GetSamplingRules(cancellationToken).ConfigureAwait(false);

cancellationToken.ThrowIfCancellationRequested();

this.RulesCache.UpdateRules(rules);

if (!cancellationToken.IsCancellationRequested)
{
// schedule the next rule poll.
this.RulePollerTimer.Change(this.PollingInterval.Add(this.RulePollerJitter), Timeout.InfiniteTimeSpan);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1326,7 +1326,7 @@ private static void WaitForActivityExport(List<Activity> exportedItems, int coun
Thread.Sleep(10);
return exportedItems.Count >= count;
},
TimeSpan.FromSeconds(1)),
TimeSpan.FromSeconds(5)),
$"Actual: {exportedItems.Count} Expected: {count}");

private static void ValidateAspNetCoreActivity(Activity activityToValidate, string expectedHttpPath)
Expand Down
74 changes: 47 additions & 27 deletions test/OpenTelemetry.Sampler.AWS.Tests/TestAWSXRayRemoteSampler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,25 @@ public void TestSamplerWithConfiguration()
.SetEndpoint(endpoint)
.Build();

var rootSamplerFieldInfo = typeof(ParentBasedSampler).GetField("rootSampler", BindingFlags.NonPublic | BindingFlags.Instance);

var xraySampler = (AWSXRayRemoteSampler?)rootSamplerFieldInfo?.GetValue(parentBasedSampler);
using var xraySampler = GetRemoteSampler(parentBasedSampler);

Assert.Equal(pollingInterval, xraySampler?.PollingInterval);
Assert.Equal(endpoint, xraySampler?.Endpoint);
Assert.NotNull(xraySampler?.RulePollerTimer);
Assert.NotNull(xraySampler?.Client);
Assert.Equal(pollingInterval, xraySampler.PollingInterval);
Assert.Equal(endpoint, xraySampler.Endpoint);
Assert.NotNull(xraySampler.RulePollerTimer);
Assert.NotNull(xraySampler.Client);
}

[Fact]
public void TestSamplerWithDefaults()
{
var parentBasedSampler = AWSXRayRemoteSampler.Builder(ResourceBuilder.CreateEmpty().Build()).Build();

var rootSamplerFieldInfo = typeof(ParentBasedSampler).GetField("rootSampler", BindingFlags.NonPublic | BindingFlags.Instance);

var xraySampler = (AWSXRayRemoteSampler?)rootSamplerFieldInfo?.GetValue(parentBasedSampler);
using var xraySampler = GetRemoteSampler(parentBasedSampler);

Assert.Equal(TimeSpan.FromMinutes(5), xraySampler?.PollingInterval);
Assert.Equal("http://localhost:2000", xraySampler?.Endpoint);
Assert.NotNull(xraySampler?.RulePollerTimer);
Assert.NotNull(xraySampler?.Client);
Assert.Equal(TimeSpan.FromMinutes(5), xraySampler.PollingInterval);
Assert.Equal("http://localhost:2000", xraySampler.Endpoint);
Assert.NotNull(xraySampler.RulePollerTimer);
Assert.NotNull(xraySampler.Client);
}

[Fact]
Expand All @@ -66,6 +62,8 @@ public async Task TestSamplerUpdateAndSample()
.SetClock(clock)
.Build();

using var remoteSampler = GetRemoteSampler(sampler);

// the sampler will use fallback sampler until rules are fetched.
Assert.Equal(SamplingDecision.RecordAndSample, this.DoSample(sampler, "cat-service"));

Expand Down Expand Up @@ -122,26 +120,48 @@ public async Task TestSamplerUpdateTargetsWithMissingTargetDocumentsDoesNotThrow
.SetClock(clock)
.Build();

var rootSamplerFieldInfo = typeof(ParentBasedSampler).GetField("rootSampler", BindingFlags.NonPublic | BindingFlags.Instance);
var sampler = (AWSXRayRemoteSampler?)rootSamplerFieldInfo?.GetValue(parentBasedSampler);

Assert.NotNull(sampler);
using var sampler = GetRemoteSampler(parentBasedSampler);

requestHandler.SetResponse("/SamplingTargets", "{\"LastRuleModification\":1530920505.0}");

var getAndUpdateTargetsAsyncMethod = typeof(AWSXRayRemoteSampler).GetMethod("GetAndUpdateTargetsAsync", BindingFlags.NonPublic | BindingFlags.Instance);
var getAndUpdateTargetsAsyncTask = (Task?)getAndUpdateTargetsAsyncMethod?.Invoke(sampler, null);
await sampler.GetAndUpdateTargetsAsync(CancellationToken.None);
}

Assert.NotNull(getAndUpdateTargetsAsyncTask);
[Fact]
public async Task ExecutePollAsyncDoesNotBlockCaller()
{
using var sampler = GetRemoteSampler(AWSXRayRemoteSampler.Builder(ResourceBuilder.CreateEmpty().Build()).Build());

try
{
await getAndUpdateTargetsAsyncTask!;
}
finally
var pollStarted = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var releasePoll = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var executePollAsyncMethod = typeof(AWSXRayRemoteSampler).GetMethod("ExecutePollAsync", BindingFlags.NonPublic | BindingFlags.Instance);

Assert.NotNull(executePollAsyncMethod);

Task PollAsync(CancellationToken cancellationToken)
{
sampler.Dispose();
pollStarted.TrySetResult(true);
cancellationToken.Register(() => releasePoll.TrySetCanceled(cancellationToken));
return releasePoll.Task;
}

var stopwatch = Stopwatch.StartNew();
var executePollTask = sampler.ExecutePollAsync(PollAsync);
stopwatch.Stop();

await pollStarted.Task;
Assert.True(stopwatch.Elapsed < TimeSpan.FromSeconds(1), $"Expected ExecutePollAsync to return without waiting for the poll to finish, but it took {stopwatch.Elapsed}.");

releasePoll.TrySetResult(true);
await executePollTask;
}

private static AWSXRayRemoteSampler GetRemoteSampler(Trace.Sampler sampler)
{
var rootSamplerFieldInfo = typeof(ParentBasedSampler).GetField("rootSampler", BindingFlags.NonPublic | BindingFlags.Instance);
var remoteSampler = (AWSXRayRemoteSampler?)rootSamplerFieldInfo?.GetValue(sampler);

return remoteSampler ?? throw new InvalidOperationException("Unable to get AWSXRayRemoteSampler from ParentBasedSampler.");
}

private SamplingDecision DoSample(Trace.Sampler sampler, string serviceName)
Expand Down
2 changes: 1 addition & 1 deletion test/Shared/TestHttpServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private static bool IsResponseAlreadyClosedException(Exception exception)
return true;
}

if (ex is HttpListenerException httpEx && (httpEx.ErrorCode is 1 or 6 or 995 or 10057))
if (ex is HttpListenerException httpEx && (httpEx.ErrorCode is 1 or 6 or 995 or 1229 or 10057))
{
return true;
}
Expand Down
Loading