Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void Subscribe()

public void OnNext(DiagnosticListener value)
{
if ((Interlocked.Read(ref this.disposed) == 0) &&
if ((Volatile.Read(ref this.disposed) == 0) &&
this.diagnosticSourceFilter(value))
{
var handler = this.handlerFactory(value.Name);
Expand Down
4 changes: 1 addition & 3 deletions src/OpenTelemetry/Metrics/HistogramBuckets.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,11 @@ public class HistogramBuckets

internal double SnapshotSum;

internal int IsCriticalSectionOccupied = 0;

internal HistogramBuckets(double[] explicitBounds)
{
this.ExplicitBounds = explicitBounds;
this.RunningBucketCounts = explicitBounds != null ? new long[explicitBounds.Length + 1] : null;
this.SnapshotBucketCounts = explicitBounds != null ? new long[explicitBounds.Length + 1] : new long[0];
this.SnapshotBucketCounts = explicitBounds != null ? new long[explicitBounds.Length + 1] : Array.Empty<long>();
}

internal object LockObject => this.SnapshotBucketCounts;
Expand Down
132 changes: 59 additions & 73 deletions src/OpenTelemetry/Metrics/MetricPoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,14 @@ internal void Update(long number)
}

case AggregationType.Histogram:
{
this.UpdateHistogram((double)number);
break;
}

case AggregationType.HistogramSumCount:
{
this.Update((double)number);
this.UpdateHistogramSumCount((double)number);
break;
}
}
Expand Down Expand Up @@ -324,58 +329,13 @@ internal void Update(double number)

case AggregationType.Histogram:
{
int i;
for (i = 0; i < this.histogramBuckets.ExplicitBounds.Length; i++)
{
// Upper bound is inclusive
if (number <= this.histogramBuckets.ExplicitBounds[i])
{
break;
}
}

var sw = default(SpinWait);
while (true)
{
if (Interlocked.Exchange(ref this.histogramBuckets.IsCriticalSectionOccupied, 1) == 0)
{
unchecked
{
this.runningValue.AsLong++;
this.histogramBuckets.RunningSum += number;
this.histogramBuckets.RunningBucketCounts[i]++;
}

this.histogramBuckets.IsCriticalSectionOccupied = 0;
break;
}

sw.SpinOnce();
}

this.UpdateHistogram(number);
break;
}

case AggregationType.HistogramSumCount:
{
var sw = default(SpinWait);
while (true)
{
if (Interlocked.Exchange(ref this.histogramBuckets.IsCriticalSectionOccupied, 1) == 0)
{
unchecked
{
this.runningValue.AsLong++;
this.histogramBuckets.RunningSum += number;
}

this.histogramBuckets.IsCriticalSectionOccupied = 0;
break;
}

sw.SpinOnce();
}

this.UpdateHistogramSumCount(number);
break;
}
}
Expand All @@ -401,23 +361,24 @@ internal void TakeSnapshot(bool outputDelta)
case AggregationType.LongSumIncomingDelta:
case AggregationType.LongSumIncomingCumulative:
{
long initValue = Volatile.Read(ref this.runningValue.AsLong);

if (outputDelta)
{
long initValue = Interlocked.Read(ref this.runningValue.AsLong);
this.snapshotValue.AsLong = initValue - this.deltaLastValue.AsLong;
this.deltaLastValue.AsLong = initValue;
this.MetricPointStatus = MetricPointStatus.NoCollectPending;

// Check again if value got updated, if yes reset status.
// This ensures no Updates get Lost.
if (initValue != Interlocked.Read(ref this.runningValue.AsLong))
if (initValue != Volatile.Read(ref this.runningValue.AsLong))
{
this.MetricPointStatus = MetricPointStatus.CollectPending;
}
}
else
{
this.snapshotValue.AsLong = Interlocked.Read(ref this.runningValue.AsLong);
this.snapshotValue.AsLong = initValue;
}

break;
Expand All @@ -426,46 +387,39 @@ internal void TakeSnapshot(bool outputDelta)
case AggregationType.DoubleSumIncomingDelta:
case AggregationType.DoubleSumIncomingCumulative:
{
double initValue = Volatile.Read(ref this.runningValue.AsDouble);

if (outputDelta)
{
// TODO:
// Is this thread-safe way to read double?
// As long as the value is not -ve infinity,
// the exchange (to 0.0) will never occur,
// but we get the original value atomically.
double initValue = Interlocked.CompareExchange(ref this.runningValue.AsDouble, 0.0, double.NegativeInfinity);
this.snapshotValue.AsDouble = initValue - this.deltaLastValue.AsDouble;
this.deltaLastValue.AsDouble = initValue;
this.MetricPointStatus = MetricPointStatus.NoCollectPending;

// Check again if value got updated, if yes reset status.
// This ensures no Updates get Lost.
if (initValue != Interlocked.CompareExchange(ref this.runningValue.AsDouble, 0.0, double.NegativeInfinity))
if (initValue != Volatile.Read(ref this.runningValue.AsDouble))
{
this.MetricPointStatus = MetricPointStatus.CollectPending;
}
}
else
{
// TODO:
// Is this thread-safe way to read double?
// As long as the value is not -ve infinity,
// the exchange (to 0.0) will never occur,
// but we get the original value atomically.
this.snapshotValue.AsDouble = Interlocked.CompareExchange(ref this.runningValue.AsDouble, 0.0, double.NegativeInfinity);
this.snapshotValue.AsDouble = initValue;
}

break;
}

case AggregationType.LongGauge:
{
this.snapshotValue.AsLong = Interlocked.Read(ref this.runningValue.AsLong);
long initValue = Volatile.Read(ref this.runningValue.AsLong);

this.snapshotValue.AsLong = initValue;
this.MetricPointStatus = MetricPointStatus.NoCollectPending;

// Check again if value got updated, if yes reset status.
// This ensures no Updates get Lost.
if (this.snapshotValue.AsLong != Interlocked.Read(ref this.runningValue.AsLong))
if (initValue != Volatile.Read(ref this.runningValue.AsLong))
{
this.MetricPointStatus = MetricPointStatus.CollectPending;
}
Expand All @@ -475,17 +429,14 @@ internal void TakeSnapshot(bool outputDelta)

case AggregationType.DoubleGauge:
{
// TODO:
// Is this thread-safe way to read double?
// As long as the value is not -ve infinity,
// the exchange (to 0.0) will never occur,
// but we get the original value atomically.
this.snapshotValue.AsDouble = Interlocked.CompareExchange(ref this.runningValue.AsDouble, 0.0, double.NegativeInfinity);
double initValue = Volatile.Read(ref this.runningValue.AsDouble);

this.snapshotValue.AsDouble = initValue;
this.MetricPointStatus = MetricPointStatus.NoCollectPending;

// Check again if value got updated, if yes reset status.
// This ensures no Updates get Lost.
if (this.snapshotValue.AsDouble != Interlocked.CompareExchange(ref this.runningValue.AsDouble, 0.0, double.NegativeInfinity))
if (initValue != Volatile.Read(ref this.runningValue.AsDouble))
{
this.MetricPointStatus = MetricPointStatus.CollectPending;
}
Expand Down Expand Up @@ -545,5 +496,40 @@ private readonly void ThrowNotSupportedMetricTypeException(string methodName)
{
throw new NotSupportedException($"{methodName} is not supported for this metric type.");
}

private void UpdateHistogram(double number)
{
int i;
for (i = 0; i < this.histogramBuckets.ExplicitBounds.Length; i++)
{
// Upper bound is inclusive
if (number <= this.histogramBuckets.ExplicitBounds[i])
{
break;
}
}

lock (this.histogramBuckets.LockObject)
{
unchecked
{
this.runningValue.AsLong++;
this.histogramBuckets.RunningSum += number;
this.histogramBuckets.RunningBucketCounts[i]++;
}
}
}

private void UpdateHistogramSumCount(double number)
{
lock (this.histogramBuckets.LockObject)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#2951 showed good perf improvement when using own mechanism, as opposed to plain locks. If we modify the SnapShot path to also use the Interlock check for IsCriticalSectionOccupied, - would that not work??

cc : @utpilla

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cijothomas Possibly, but there be dragons. I'll paraphrase what @noahfalk told me.

Consider code like this...

   if (Interlocked.Exchange(ref this.histogramBuckets.IsCriticalSectionOccupied, 1) == 0)
   {
      this.runningValue.AsLong++;
      this.histogramBuckets.RunningSum += number;
      this.histogramBuckets.RunningBucketCounts[i]++;

      this.histogramBuckets.IsCriticalSectionOccupied = 0;
   }

The compiler is free/able (by spec) to rewrite that as:

   if (Interlocked.Exchange(ref this.histogramBuckets.IsCriticalSectionOccupied, 1) == 0)
   {
      this.histogramBuckets.IsCriticalSectionOccupied = 0; // Danger!

      this.runningValue.AsLong++;
      this.histogramBuckets.RunningSum += number;
      this.histogramBuckets.RunningBucketCounts[i]++;
   }

That is why it is risky to try and make our own mechanism. If we just use lock there, order is guaranteed to not change.

Copy link
Contributor

@utpilla utpilla Jul 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can use Interlocked.Exchange (or maybe Volatile.Write) instead of simply assigning that to zero to avoid that risk.

There is an example on the docs for Interlocked that shows this this approach: https://docs.microsoft.com/en-us/dotnet/api/system.threading.interlocked?view=net-6.0#examples. This example does not even use Volatile.Write. It just uses an Interlocked.Exchange to update that value.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@utpilla Let's say we did it like this...

if (Interlocked.Exchange(ref this.histogramBuckets.IsCriticalSectionOccupied, 1) == 0)
{
    Interlocked.Increment(ref this.runningValue.AsLong);
    Interlocked.Add(ref this.histogramBuckets.RunningSum, number); // Not possible with double, but for discussion sake
    Interlocked.Increment(ref this.histogramBuckets.RunningBucketCounts[i]);

    Interlocked.Exchange(ref this.histogramBuckets.IsCriticalSectionOccupied, 0);
}

(Or different version with a mix of Interlocked vs Volatile.)

That would work (I think) but would also be a lot slower for the happy-path?

I thought maybe Thread.MemoryBarrier would help us. But the docs do recommend a lock over that 🤷

Let's say we did it like this...

if (Interlocked.Exchange(ref this.histogramBuckets.IsCriticalSectionOccupied, 1) == 0)
{
    unchecked
    {
        this.runningValue.AsLong++;
        this.histogramBuckets.RunningSum += number;
        this.histogramBuckets.RunningBucketCounts[i]++;
    }

    Interlocked.Exchange(ref this.histogramBuckets.IsCriticalSectionOccupied, 0);
}

I don't think that works. Because the stuff in unchecked could be cached. Needs some kind of a memory fence. But I could be wrong this stuff is confusing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I suggested using Interlocked, I didn't mean that we should switch every statement to use Interlocked like shown here:

if (Interlocked.Exchange(ref this.histogramBuckets.IsCriticalSectionOccupied, 1) == 0)
{
    Interlocked.Increment(ref this.runningValue.AsLong);
    Interlocked.Add(ref this.histogramBuckets.RunningSum, number); // Not possible with double, but for discussion sake
    Interlocked.Increment(ref this.histogramBuckets.RunningBucketCounts[i]);

    Interlocked.Exchange(ref this.histogramBuckets.IsCriticalSectionOccupied, 0);
}

I was indeed referring to this:

if (Interlocked.Exchange(ref this.histogramBuckets.IsCriticalSectionOccupied, 1) == 0)
{
    unchecked
    {
        this.runningValue.AsLong++;
        this.histogramBuckets.RunningSum += number;
        this.histogramBuckets.RunningBucketCounts[i]++;
    }

    Interlocked.Exchange(ref this.histogramBuckets.IsCriticalSectionOccupied, 0);
}

I don't think that works. Because the stuff in unchecked could be cached. Needs some kind of a memory fence.

How does using a lock instead help with this? ^

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@utpilla

How does using a lock instead help with this?

Good question. It is magic 😄 @noahfalk Can you provide a little detail here? My understanding is that the compiler(?) treats the whole lock body as fenced/non-reordering?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be okay even if the instructions inside the unchecked block are re-ordered.

the stuff in unchecked could be cached

But does memory barrier/fence even help with caching/freshness?

{
unchecked
{
this.runningValue.AsLong++;
this.histogramBuckets.RunningSum += number;
}
}
}
}
}