Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/OpenTelemetry/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ Notes](../../RELEASENOTES.md).

## Unreleased

* Fix observable instrument callbacks running once per reader instead of
once per collection cycle.
([#TODO](https://github.com/open-telemetry/opentelemetry-dotnet/pull/TODO))
Comment thread
martincostello marked this conversation as resolved.
Outdated

## 1.15.3

Released 2026-Apr-21
Expand Down
2 changes: 2 additions & 0 deletions src/OpenTelemetry/Metrics/MeterProviderSdk.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ internal MeterProviderSdk(
this.Reader = new CompositeMetricReader([this.Reader, reader]);
}

this.Reader.SetParentProvider(this);

if (reader is PeriodicExportingMetricReader periodicExportingMetricReader)
{
exportersAdded.Append(periodicExportingMetricReader.Exporter);
Expand Down
15 changes: 15 additions & 0 deletions src/OpenTelemetry/Metrics/Reader/BaseExportingMetricReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,21 @@ internal override bool ProcessMetrics(in Batch<Metric> metrics, int timeoutMilli
}
}

/// <inheritdoc />
internal override bool OnCollectFromComposite(int timeoutMilliseconds)
{
if (this.SupportedExportModes.HasFlag(ExportModes.Push))
{
return base.OnCollectFromComposite(timeoutMilliseconds);
}
else if (this.SupportedExportModes.HasFlag(ExportModes.Pull) && PullMetricScope.IsPullAllowed)
{
return base.OnCollectFromComposite(timeoutMilliseconds);
}

return false;
}

/// <inheritdoc />
protected override bool OnCollect(int timeoutMilliseconds)
{
Expand Down
8 changes: 5 additions & 3 deletions src/OpenTelemetry/Metrics/Reader/CompositeMetricReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,25 +63,27 @@ internal override bool ProcessMetrics(in Batch<Metric> metrics, int timeoutMilli
}

/// <inheritdoc/>
protected override bool OnCollect(int timeoutMilliseconds = Timeout.Infinite)
protected override bool OnCollect(int timeoutMilliseconds)
{
var result = true;
var sw = timeoutMilliseconds == Timeout.Infinite
? null
: Stopwatch.StartNew();

this.CollectObservableInstruments();

for (var cur = this.Head; cur != null; cur = cur.Next)
{
if (sw == null)
{
result = cur.Value.Collect(Timeout.Infinite) && result;
result = cur.Value.CollectFromComposite(Timeout.Infinite) && result;
}
else
{
var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds;

// notify all the readers, even if we run overtime
result = cur.Value.Collect((int)Math.Max(timeout, 0)) && result;
result = cur.Value.CollectFromComposite((int)Math.Max(timeout, 0)) && result;
}
}

Expand Down
233 changes: 137 additions & 96 deletions src/OpenTelemetry/Metrics/Reader/MetricReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,69 +120,7 @@ public MetricReaderTemporalityPreference TemporalityPreference
/// the semantic can be preserved.
/// </remarks>
public bool Collect(int timeoutMilliseconds = Timeout.Infinite)
{
Guard.ThrowIfInvalidTimeout(timeoutMilliseconds);

OpenTelemetrySdkEventSource.Log.MetricReaderEvent("MetricReader.Collect method called.");
var shouldRunCollect = false;
var tcs = this.collectionTcs;

if (tcs == null)
{
lock (this.newTaskLock)
{
tcs = this.collectionTcs;

if (tcs == null)
{
shouldRunCollect = true;
tcs = new TaskCompletionSource<bool>();
this.collectionTcs = tcs;
}
}
}

if (!shouldRunCollect)
{
return Task.WaitAny([tcs.Task, this.shutdownTcs.Task], timeoutMilliseconds) == 0 && tcs.Task.Result;
}

var result = false;
try
{
lock (this.onCollectLock)
{
result = this.OnCollect(timeoutMilliseconds);
}
}
catch (Exception ex)
{
OpenTelemetrySdkEventSource.Log.MetricReaderException(nameof(this.Collect), ex);
}
finally
{
tcs.TrySetResult(result);

lock (this.newTaskLock)
{
if (ReferenceEquals(this.collectionTcs, tcs))
{
this.collectionTcs = null;
}
}
}

if (result)
{
OpenTelemetrySdkEventSource.Log.MetricReaderEvent("MetricReader.Collect succeeded.");
}
else
{
OpenTelemetrySdkEventSource.Log.MetricReaderEvent("MetricReader.Collect failed.");
}

return result;
}
=> this.Collect(timeoutMilliseconds, collectObservableInstruments: true);

/// <summary>
/// Attempts to shutdown the processor, blocks the current thread until
Expand Down Expand Up @@ -244,6 +182,12 @@ public void Dispose()
GC.SuppressFinalize(this);
}

internal bool CollectFromComposite(int timeoutMilliseconds = Timeout.Infinite)
=> this.Collect(timeoutMilliseconds, collectObservableInstruments: false);

internal void CollectObservableInstruments()
=> (this.parentProvider as MeterProviderSdk)?.CollectObservableInstruments();
Comment thread
martincostello marked this conversation as resolved.

internal virtual void SetParentProvider(BaseProvider parentProvider)
{
if (this.parentProvider != null && this.parentProvider != parentProvider)
Expand All @@ -269,6 +213,30 @@ internal virtual void SetParentProvider(BaseProvider parentProvider)
internal virtual bool ProcessMetrics(in Batch<Metric> metrics, int timeoutMilliseconds)
=> true;

/// <summary>
/// Called by <c>CollectFromComposite</c>. This function should block the
/// current thread until metrics collection completed, shutdown signaled
/// or timed out.
/// </summary>
/// <param name="timeoutMilliseconds">
/// The number (non-negative) of milliseconds to wait, or
/// <c>Timeout.Infinite</c> to wait indefinitely.
/// </param>
/// <returns>
/// Returns <c>true</c> when metrics collection succeeded; otherwise,
/// <c>false</c>.
/// </returns>
internal virtual bool OnCollectFromComposite(int timeoutMilliseconds)
{
OpenTelemetrySdkEventSource.Log.MetricReaderEvent("MetricReader.OnCollectFromComposite called.");

var sw = timeoutMilliseconds == Timeout.Infinite
? null
: Stopwatch.StartNew();

return this.ProcessMetricsCollection(sw, timeoutMilliseconds);
}

/// <summary>
/// Called by <c>Collect</c>. This function should block the current
/// thread until metrics collection completed, shutdown signaled or
Expand All @@ -294,11 +262,115 @@ protected virtual bool OnCollect(int timeoutMilliseconds)
? null
: Stopwatch.StartNew();

var meterProviderSdk = this.parentProvider as MeterProviderSdk;
meterProviderSdk?.CollectObservableInstruments();
this.CollectObservableInstruments();

OpenTelemetrySdkEventSource.Log.MetricReaderEvent("Observable instruments collected.");

return this.ProcessMetricsCollection(sw, timeoutMilliseconds);
}

/// <summary>
/// Called by <c>Shutdown</c>. This function should block the current
/// thread until shutdown completed or timed out.
/// </summary>
/// <param name="timeoutMilliseconds">
/// The number (non-negative) of milliseconds to wait, or
/// <c>Timeout.Infinite</c> to wait indefinitely.
/// </param>
/// <returns>
/// Returns <c>true</c> when shutdown succeeded; otherwise, <c>false</c>.
/// </returns>
/// <remarks>
/// This function is called synchronously on the thread which made the
/// first call to <c>Shutdown</c>. This function should not throw
/// exceptions.
/// </remarks>
protected virtual bool OnShutdown(int timeoutMilliseconds)
=> this.Collect(timeoutMilliseconds);

/// <summary>
/// Releases the unmanaged resources used by this class and optionally
/// releases the managed resources.
/// </summary>
/// <param name="disposing">
/// <see langword="true"/> to release both managed and unmanaged resources;
/// <see langword="false"/> to release only unmanaged resources.
/// </param>
protected virtual void Dispose(bool disposing)
{
}

private bool Collect(int timeoutMilliseconds, bool collectObservableInstruments)
{
Guard.ThrowIfInvalidTimeout(timeoutMilliseconds);

OpenTelemetrySdkEventSource.Log.MetricReaderEvent("MetricReader.Collect method called.");
var shouldRunCollect = false;
var tcs = this.collectionTcs;

if (tcs == null)
{
lock (this.newTaskLock)
{
tcs = this.collectionTcs;

if (tcs == null)
{
shouldRunCollect = true;
tcs = new TaskCompletionSource<bool>();
this.collectionTcs = tcs;
}
}
}

if (!shouldRunCollect)
{
return Task.WaitAny([tcs.Task, this.shutdownTcs.Task], timeoutMilliseconds) == 0 && tcs.Task.Result;
}

var result = false;
try
{
lock (this.onCollectLock)
{
result = collectObservableInstruments
? this.OnCollect(timeoutMilliseconds)
: this.OnCollectFromComposite(timeoutMilliseconds);
}
}
catch (Exception ex)
{
OpenTelemetrySdkEventSource.Log.MetricReaderException(nameof(this.Collect), ex);
}
finally
{
tcs.TrySetResult(result);

lock (this.newTaskLock)
{
if (ReferenceEquals(this.collectionTcs, tcs))
{
this.collectionTcs = null;
}
}
}

if (result)
{
OpenTelemetrySdkEventSource.Log.MetricReaderEvent("MetricReader.Collect succeeded.");
}
else
{
OpenTelemetrySdkEventSource.Log.MetricReaderEvent("MetricReader.Collect failed.");
}

return result;
}

private bool ProcessMetricsCollection(Stopwatch? sw, int timeoutMilliseconds)
{
OpenTelemetrySdkEventSource.Log.MetricReaderEvent("ProcessMetricsCollection called.");

var metrics = this.GetMetricsBatch();

bool result;
Expand Down Expand Up @@ -341,35 +413,4 @@ protected virtual bool OnCollect(int timeoutMilliseconds)
return result;
}
}

/// <summary>
/// Called by <c>Shutdown</c>. This function should block the current
/// thread until shutdown completed or timed out.
/// </summary>
/// <param name="timeoutMilliseconds">
/// The number (non-negative) of milliseconds to wait, or
/// <c>Timeout.Infinite</c> to wait indefinitely.
/// </param>
/// <returns>
/// Returns <c>true</c> when shutdown succeeded; otherwise, <c>false</c>.
/// </returns>
/// <remarks>
/// This function is called synchronously on the thread which made the
/// first call to <c>Shutdown</c>. This function should not throw
/// exceptions.
/// </remarks>
protected virtual bool OnShutdown(int timeoutMilliseconds)
=> this.Collect(timeoutMilliseconds);

/// <summary>
/// Releases the unmanaged resources used by this class and optionally
/// releases the managed resources.
/// </summary>
/// <param name="disposing">
/// <see langword="true"/> to release both managed and unmanaged resources;
/// <see langword="false"/> to release only unmanaged resources.
/// </param>
protected virtual void Dispose(bool disposing)
{
}
}
Loading
Loading