Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,6 @@ public class EventHubQueueCacheFactory : IEventHubQueueCacheFactory
/// Constructor for EventHubQueueCacheFactory
/// </summary>
public EventHubQueueCacheFactory(
EventHubStreamCachePressureOptions cacheOptions,
StreamCacheEvictionOptions evictionOptions,
StreamStatisticOptions statisticOptions,
IEventHubDataAdapter dataAdater,
EventHubMonitorAggregationDimensions sharedDimensions,
Func<EventHubCacheMonitorDimensions, ILoggerFactory, ICacheMonitor> cacheMonitorFactory = null,
Func<EventHubBlockPoolMonitorDimensions, ILoggerFactory, IBlockPoolMonitor> blockPoolMonitorFactory = null)
: this(cacheOptions, evictionOptions, statisticOptions, dataAdater, sharedDimensions, null, cacheMonitorFactory, blockPoolMonitorFactory)
{
}

internal EventHubQueueCacheFactory(
EventHubStreamCachePressureOptions cacheOptions,
StreamCacheEvictionOptions evictionOptions,
StreamStatisticOptions statisticOptions,
Expand All @@ -68,16 +56,10 @@ internal EventHubQueueCacheFactory(
this.timePurge = new TimePurgePredicate(evictionOptions.DataMinTimeInCache, evictionOptions.DataMaxAgeInCache);
this.sharedDimensions = sharedDimensions;
this.orleansInstruments = instruments;
this.CacheMonitorFactory = cacheMonitorFactory ?? CreateDefaultCacheMonitor;
this.BlockPoolMonitorFactory = blockPoolMonitorFactory ?? CreateDefaultBlockPoolMonitor;
this.CacheMonitorFactory = cacheMonitorFactory ?? ((dimensions, logger) => new DefaultEventHubCacheMonitor(dimensions, this.orleansInstruments));
this.BlockPoolMonitorFactory = blockPoolMonitorFactory ?? ((dimensions, logger) => new DefaultEventHubBlockPoolMonitor(dimensions, this.orleansInstruments));
}

private ICacheMonitor CreateDefaultCacheMonitor(EventHubCacheMonitorDimensions dimensions, ILoggerFactory logger) =>
this.orleansInstruments is not null ? new DefaultEventHubCacheMonitor(dimensions, this.orleansInstruments) : new DefaultEventHubCacheMonitor(dimensions);

private IBlockPoolMonitor CreateDefaultBlockPoolMonitor(EventHubBlockPoolMonitorDimensions dimensions, ILoggerFactory logger) =>
this.orleansInstruments is not null ? new DefaultEventHubBlockPoolMonitor(dimensions, this.orleansInstruments) : new DefaultEventHubBlockPoolMonitor(dimensions);

/// <summary>
/// Function which create an EventHubQueueCache, which by default will configure the EventHubQueueCache using configuration in CreateBufferPool function
/// and AddCachePressureMonitors function.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,7 @@ public class DefaultEventHubBlockPoolMonitor : DefaultBlockPoolMonitor
/// Constructor
/// </summary>
/// <param name="dimensions"></param>
public DefaultEventHubBlockPoolMonitor(EventHubBlockPoolMonitorDimensions dimensions) : base(new KeyValuePair<string, object>[] { new("Path", dimensions.EventHubPath), new("ObjectPoolId", dimensions.BlockPoolId) })
{
}

internal DefaultEventHubBlockPoolMonitor(EventHubBlockPoolMonitorDimensions dimensions, OrleansInstruments instruments)
public DefaultEventHubBlockPoolMonitor(EventHubBlockPoolMonitorDimensions dimensions, OrleansInstruments instruments)
: base(new KeyValuePair<string, object>[] { new("Path", dimensions.EventHubPath), new("ObjectPoolId", dimensions.BlockPoolId) }, instruments)
{
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,7 @@ public class DefaultEventHubCacheMonitor : DefaultCacheMonitor
/// Constructor
/// </summary>
/// <param name="dimensions"></param>
public DefaultEventHubCacheMonitor(EventHubCacheMonitorDimensions dimensions)
: base(new KeyValuePair<string, object>[] { new("Path", dimensions.EventHubPath), new("Partition", dimensions.EventHubPartition) })
{
}

internal DefaultEventHubCacheMonitor(EventHubCacheMonitorDimensions dimensions, OrleansInstruments instruments)
public DefaultEventHubCacheMonitor(EventHubCacheMonitorDimensions dimensions, OrleansInstruments instruments)
: base(new KeyValuePair<string, object>[] { new("Path", dimensions.EventHubPath), new("Partition", dimensions.EventHubPartition) }, instruments)
{
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,7 @@ public class DefaultEventHubReceiverMonitor : DefaultQueueAdapterReceiverMonitor
/// Constructor
/// </summary>
/// <param name="dimensions">Aggregation Dimension bag for EventhubReceiverMonitor</param>
public DefaultEventHubReceiverMonitor(EventHubReceiverMonitorDimensions dimensions)
: base(new KeyValuePair<string, object>[] { new("Path", dimensions.EventHubPath), new("Partition", dimensions.EventHubPartition) })
{
}

internal DefaultEventHubReceiverMonitor(EventHubReceiverMonitorDimensions dimensions, OrleansInstruments instruments)
public DefaultEventHubReceiverMonitor(EventHubReceiverMonitorDimensions dimensions, OrleansInstruments instruments)
: base(new KeyValuePair<string, object>[] { new("Path", dimensions.EventHubPath), new("Partition", dimensions.EventHubPartition) }, instruments)
{
}
Expand Down
8 changes: 0 additions & 8 deletions src/Orleans.Core/Diagnostics/Metrics/Instruments.cs

This file was deleted.

20 changes: 2 additions & 18 deletions src/Orleans.Streaming/Common/Monitors/DefaultBlockPoolMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,8 @@ public class DefaultBlockPoolMonitor : IBlockPoolMonitor
private long _releasedMemory;
private long _allocatedMemory;

/// <summary>
/// Initializes a new instance of the <see cref="DefaultBlockPoolMonitor"/> class.
/// </summary>
protected DefaultBlockPoolMonitor(KeyValuePair<string, object>[] dimensions)
: this(dimensions, Instruments.Meter)
public DefaultBlockPoolMonitor(BlockPoolMonitorDimensions dimensions, OrleansInstruments instruments)
: this(new KeyValuePair<string, object>[] { new("BlockPoolId", dimensions.BlockPoolId) }, instruments.Meter)
{
}

Expand All @@ -36,11 +33,6 @@ protected DefaultBlockPoolMonitor(KeyValuePair<string, object>[] dimensions, Orl
{
}

internal DefaultBlockPoolMonitor(BlockPoolMonitorDimensions dimensions, OrleansInstruments instruments)
: this(new KeyValuePair<string, object>[] { new("BlockPoolId", dimensions.BlockPoolId) }, instruments.Meter)
{
}

private DefaultBlockPoolMonitor(KeyValuePair<string, object>[] dimensions, Meter meter)
{
_dimensions = dimensions;
Expand All @@ -51,14 +43,6 @@ private DefaultBlockPoolMonitor(KeyValuePair<string, object>[] dimensions, Meter
_allocatedMemoryCounter = meter.CreateObservableCounter<long>(InstrumentNames.STREAMS_BLOCK_POOL_ALLOCATED_MEMORY, GetAllocatedMemory, unit: "bytes");
}

/// <summary>
/// Initializes a new instance of the <see cref="DefaultBlockPoolMonitor"/> class.
/// </summary>
/// <param name="dimensions">The dimensions.</param>
public DefaultBlockPoolMonitor(BlockPoolMonitorDimensions dimensions) : this(new KeyValuePair<string, object>[] { new ("BlockPoolId", dimensions.BlockPoolId) })
{
}

private Measurement<long> GetTotalMemory() => new(_totalMemory, _dimensions);
private Measurement<long> GetAvailableMemory() => new(_availableMemory, _dimensions);
private Measurement<long> GetClaimedMemory() => new(_claimedMemory, _dimensions);
Expand Down
20 changes: 2 additions & 18 deletions src/Orleans.Streaming/Common/Monitors/DefaultCacheMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,8 @@ public class DefaultCacheMonitor : ICacheMonitor
private ValueStopwatch _oldestMessageDequeueAgo;
private ValueStopwatch _oldestToNewestAge;

/// <summary>
/// Initializes a new instance of the <see cref="DefaultCacheMonitor"/> class.
/// </summary>
protected DefaultCacheMonitor(KeyValuePair<string, object>[] dimensions)
: this(dimensions, Instruments.Meter)
public DefaultCacheMonitor(CacheMonitorDimensions dimensions, OrleansInstruments instruments)
: this(new KeyValuePair<string, object>[] { new("QueueId", dimensions.QueueId) }, instruments.Meter)
{
}

Expand All @@ -48,11 +45,6 @@ protected DefaultCacheMonitor(KeyValuePair<string, object>[] dimensions, Orleans
{
}

internal DefaultCacheMonitor(CacheMonitorDimensions dimensions, OrleansInstruments instruments)
: this(new KeyValuePair<string, object>[] { new("QueueId", dimensions.QueueId) }, instruments.Meter)
{
}

private DefaultCacheMonitor(KeyValuePair<string, object>[] dimensions, Meter meter)
{
_dimensions = dimensions;
Expand All @@ -76,14 +68,6 @@ IEnumerable<Measurement<T>> GetPressureMonitorMeasurement<T>(Func<PressureMonito
}
}

/// <summary>
/// Initializes a new instance of the <see cref="DefaultCacheMonitor"/> class.
/// </summary>
/// <param name="dimensions">The dimensions.</param>
public DefaultCacheMonitor(CacheMonitorDimensions dimensions) : this(new KeyValuePair<string, object>[] { new("QueueId", dimensions.QueueId) })
{
}

private Measurement<long> GetOldestToNewestAge() => new(_oldestToNewestAge.ElapsedTicks, _dimensions);
private Measurement<long> GetOldestAge() => new(_oldestMessageDequeueAgo.ElapsedTicks, _dimensions);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,11 @@ public class DefaultQueueAdapterReceiverMonitor : IQueueAdapterReceiverMonitor
private ValueStopwatch _newestMessageReadEnqueueAge;
private long _messagesReceived;

protected DefaultQueueAdapterReceiverMonitor(KeyValuePair<string,object>[] dimensions)
: this(dimensions, Instruments.Meter)
{
}

protected DefaultQueueAdapterReceiverMonitor(KeyValuePair<string, object>[] dimensions, OrleansInstruments instruments)
: this(dimensions, instruments.Meter)
{
}

internal DefaultQueueAdapterReceiverMonitor(ReceiverMonitorDimensions dimensions, OrleansInstruments instruments)
: this(new KeyValuePair<string, object>[] { new("QueueId", dimensions.QueueId) }, instruments.Meter)
{
}

private DefaultQueueAdapterReceiverMonitor(KeyValuePair<string, object>[] dimensions, Meter meter)
{
_dimensions = dimensions;
Expand All @@ -64,11 +54,8 @@ private DefaultQueueAdapterReceiverMonitor(KeyValuePair<string, object>[] dimens
_newestMessageReadEnqueueTimeToNowCounter = meter.CreateObservableGauge<long>(InstrumentNames.STREAMS_QUEUE_NEWEST_MESSAGE_ENQUEUE_AGE, GetNewestMessageReadEnqueueAge);
}

/// <summary>
/// Initializes a new instance of the <see cref="DefaultQueueAdapterReceiverMonitor"/> class.
/// </summary>
/// <param name="dimensions">The dimensions.</param>
public DefaultQueueAdapterReceiverMonitor(ReceiverMonitorDimensions dimensions) : this(new KeyValuePair<string,object>[] { new("QueueId", dimensions.QueueId) })
public DefaultQueueAdapterReceiverMonitor(ReceiverMonitorDimensions dimensions, OrleansInstruments instruments)
: this(new KeyValuePair<string, object>[] { new("QueueId", dimensions.QueueId) }, instruments.Meter)
{
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
using System.Diagnostics.Metrics;
using System.Threading;
using Orleans.Runtime;
using Orleans.Transactions.Abstractions;

namespace Orleans.Transactions
{
public class TransactionAgentStatistics : ITransactionAgentStatistics
{
private static readonly Meter Meter = new("Orleans");

private const string TRANSACTIONS_STARTED = "orleans-transactions-started";
private const string TRANSACTIONS_SUCCESSFUL = "orleans-transactions-successful";
private const string TRANSACTIONS_FAILED = "orleans-transactions-failed";
Expand All @@ -23,11 +22,21 @@ public class TransactionAgentStatistics : ITransactionAgentStatistics
private long _transactionsThrottled;

public TransactionAgentStatistics()
: this(new Meter("Microsoft.Orleans"))
{
}

public TransactionAgentStatistics(OrleansInstruments instruments)
: this(instruments.Meter)
{
}

private TransactionAgentStatistics(Meter meter)
{
_transactionsStartedCounter = Meter.CreateObservableCounter<long>(TRANSACTIONS_STARTED, () => new(TransactionsStarted));
_transactionsSuccessfulCounter = Meter.CreateObservableCounter<long>(TRANSACTIONS_SUCCESSFUL, () => new(TransactionsSucceeded));
_transactionsFailedCounter = Meter.CreateObservableCounter<long>(TRANSACTIONS_FAILED, () => new(TransactionsFailed));
_transactionsThrottledCounter = Meter.CreateObservableCounter<long>(TRANSACTIONS_THROTTLED, () => new(TransactionsThrottled));
_transactionsStartedCounter = meter.CreateObservableCounter<long>(TRANSACTIONS_STARTED, () => new(TransactionsStarted));
_transactionsSuccessfulCounter = meter.CreateObservableCounter<long>(TRANSACTIONS_SUCCESSFUL, () => new(TransactionsSucceeded));
_transactionsFailedCounter = meter.CreateObservableCounter<long>(TRANSACTIONS_FAILED, () => new(TransactionsFailed));
_transactionsThrottledCounter = meter.CreateObservableCounter<long>(TRANSACTIONS_THROTTLED, () => new(TransactionsThrottled));
}

public long TransactionsStarted => _transactionsStarted;
Expand Down Expand Up @@ -57,13 +66,39 @@ public void TrackTransactionThrottled()

public static ITransactionAgentStatistics Copy(ITransactionAgentStatistics initialStatistics)
{
return new TransactionAgentStatistics
return new TransactionAgentStatisticsSnapshot(initialStatistics);
}

private sealed class TransactionAgentStatisticsSnapshot : ITransactionAgentStatistics
{
private long _transactionsStarted;
private long _transactionsSucceeded;
private long _transactionsFailed;
private long _transactionsThrottled;

public TransactionAgentStatisticsSnapshot(ITransactionAgentStatistics statistics)
{
_transactionsStarted = initialStatistics.TransactionsStarted,
_transactionsSucceeded = initialStatistics.TransactionsSucceeded,
_transactionsFailed = initialStatistics.TransactionsFailed,
_transactionsThrottled = initialStatistics.TransactionsThrottled
};
_transactionsStarted = statistics.TransactionsStarted;
_transactionsSucceeded = statistics.TransactionsSucceeded;
_transactionsFailed = statistics.TransactionsFailed;
_transactionsThrottled = statistics.TransactionsThrottled;
}

public long TransactionsStarted => _transactionsStarted;

public long TransactionsSucceeded => _transactionsSucceeded;

public long TransactionsFailed => _transactionsFailed;

public long TransactionsThrottled => _transactionsThrottled;

public void TrackTransactionStarted() => Interlocked.Increment(ref _transactionsStarted);

public void TrackTransactionSucceeded() => Interlocked.Increment(ref _transactionsSucceeded);

public void TrackTransactionFailed() => Interlocked.Increment(ref _transactionsFailed);

public void TrackTransactionThrottled() => Interlocked.Increment(ref _transactionsThrottled);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ internal static IServiceCollection UseTransactionsWithSilo(this IServiceCollecti

internal static IServiceCollection AddTransactionsBaseline(this IServiceCollection services)
{
services.AddMetrics();
services.TryAddSingleton<OrleansInstruments>();
services.TryAddSingleton<IClock, Clock>();
services.AddSingleton<ITransactionAgent, TransactionAgent>();
services.AddSingleton<ITransactionClient, TransactionClient>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ public partial class AzureStoragePolicyOptions

public partial class DefaultEventHubReceiverMonitor : Providers.Streams.Common.DefaultQueueAdapterReceiverMonitor
{
public DefaultEventHubReceiverMonitor(EventHubReceiverMonitorDimensions dimensions) : base(default(System.Collections.Generic.KeyValuePair<string, object>[])!) { }
public DefaultEventHubReceiverMonitor(EventHubReceiverMonitorDimensions dimensions, Runtime.OrleansInstruments instruments) : base(default(System.Collections.Generic.KeyValuePair<string, object>[])!, default(Runtime.OrleansInstruments)!) { }
}

public static partial class EventDataExtensions
Expand Down Expand Up @@ -467,7 +467,7 @@ public void SignalPurge() { }

public partial class EventHubQueueCacheFactory : IEventHubQueueCacheFactory
{
public EventHubQueueCacheFactory(Configuration.EventHubStreamCachePressureOptions cacheOptions, Configuration.StreamCacheEvictionOptions evictionOptions, Configuration.StreamStatisticOptions statisticOptions, IEventHubDataAdapter dataAdater, EventHubMonitorAggregationDimensions sharedDimensions, System.Func<EventHubCacheMonitorDimensions, Microsoft.Extensions.Logging.ILoggerFactory, Providers.Streams.Common.ICacheMonitor> cacheMonitorFactory = null, System.Func<EventHubBlockPoolMonitorDimensions, Microsoft.Extensions.Logging.ILoggerFactory, Providers.Streams.Common.IBlockPoolMonitor> blockPoolMonitorFactory = null) { }
public EventHubQueueCacheFactory(Configuration.EventHubStreamCachePressureOptions cacheOptions, Configuration.StreamCacheEvictionOptions evictionOptions, Configuration.StreamStatisticOptions statisticOptions, IEventHubDataAdapter dataAdater, EventHubMonitorAggregationDimensions sharedDimensions, Runtime.OrleansInstruments instruments, System.Func<EventHubCacheMonitorDimensions, Microsoft.Extensions.Logging.ILoggerFactory, Providers.Streams.Common.ICacheMonitor> cacheMonitorFactory = null, System.Func<EventHubBlockPoolMonitorDimensions, Microsoft.Extensions.Logging.ILoggerFactory, Providers.Streams.Common.IBlockPoolMonitor> blockPoolMonitorFactory = null) { }

public System.Func<EventHubBlockPoolMonitorDimensions, Microsoft.Extensions.Logging.ILoggerFactory, Providers.Streams.Common.IBlockPoolMonitor> BlockPoolMonitorFactory { get { throw null; } set { } }

Expand Down Expand Up @@ -585,12 +585,12 @@ namespace Orleans.Streaming.EventHubs.StatisticMonitors
{
public partial class DefaultEventHubBlockPoolMonitor : Providers.Streams.Common.DefaultBlockPoolMonitor
{
public DefaultEventHubBlockPoolMonitor(EventHubBlockPoolMonitorDimensions dimensions) : base(default(System.Collections.Generic.KeyValuePair<string, object>[])!) { }
public DefaultEventHubBlockPoolMonitor(EventHubBlockPoolMonitorDimensions dimensions, Runtime.OrleansInstruments instruments) : base(default(System.Collections.Generic.KeyValuePair<string, object>[])!, default(Runtime.OrleansInstruments)!) { }
}

public partial class DefaultEventHubCacheMonitor : Providers.Streams.Common.DefaultCacheMonitor
{
public DefaultEventHubCacheMonitor(EventHubCacheMonitorDimensions dimensions) : base(default(System.Collections.Generic.KeyValuePair<string, object>[])!) { }
public DefaultEventHubCacheMonitor(EventHubCacheMonitorDimensions dimensions, Runtime.OrleansInstruments instruments) : base(default(System.Collections.Generic.KeyValuePair<string, object>[])!, default(Runtime.OrleansInstruments)!) { }
}
}

Expand Down
5 changes: 0 additions & 5 deletions src/api/Orleans.Core/Orleans.Core.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1425,11 +1425,6 @@ public readonly partial struct IndirectProbeResponse
public override readonly string ToString() { throw null; }
}

public static partial class Instruments
{
public static readonly System.Diagnostics.Metrics.Meter Meter;
}

public partial interface IRingRange
{
bool InRange(GrainId grainId);
Expand Down
Loading
Loading