From 9404369f1da7d11328355b27240d69a9a9abaf49 Mon Sep 17 00:00:00 2001 From: Meir Blachman Date: Wed, 10 Jun 2026 15:08:00 +0300 Subject: [PATCH] chore: use meter factory for Event Hubs cache metrics Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../EventHub/EventHubAdapterFactory.cs | 2 +- .../EventHub/EventHubQueueCacheFactory.cs | 26 +++++++++++++++++-- .../DefaultEventHubBlockPoolMonitor.cs | 6 +++++ .../DefaultEventHubCacheMonitor.cs | 6 +++++ .../Monitors/DefaultBlockPoolMonitor.cs | 5 ++++ .../Common/Monitors/DefaultCacheMonitor.cs | 5 ++++ .../Orleans.Streaming/Orleans.Streaming.cs | 4 +++ 7 files changed, 51 insertions(+), 3 deletions(-) diff --git a/src/Azure/Orleans.Streaming.EventHubs/Providers/Streams/EventHub/EventHubAdapterFactory.cs b/src/Azure/Orleans.Streaming.EventHubs/Providers/Streams/EventHub/EventHubAdapterFactory.cs index 27a9d81db11..748e5dc17ab 100644 --- a/src/Azure/Orleans.Streaming.EventHubs/Providers/Streams/EventHub/EventHubAdapterFactory.cs +++ b/src/Azure/Orleans.Streaming.EventHubs/Providers/Streams/EventHub/EventHubAdapterFactory.cs @@ -265,7 +265,7 @@ protected virtual IEventHubQueueCacheFactory CreateCacheFactory(EventHubStreamCa { var eventHubPath = this.ehOptions.EventHubName; var sharedDimensions = new EventHubMonitorAggregationDimensions(eventHubPath); - return new EventHubQueueCacheFactory(eventHubCacheOptions, cacheEvictionOptions, statisticOptions, this.dataAdapter, sharedDimensions); + return new EventHubQueueCacheFactory(eventHubCacheOptions, cacheEvictionOptions, statisticOptions, this.dataAdapter, sharedDimensions, this.orleansInstruments); } private EventHubAdapterReceiver MakeReceiver(QueueId queueId) diff --git a/src/Azure/Orleans.Streaming.EventHubs/Providers/Streams/EventHub/EventHubQueueCacheFactory.cs b/src/Azure/Orleans.Streaming.EventHubs/Providers/Streams/EventHub/EventHubQueueCacheFactory.cs index c323c93243a..f59604c7246 100644 --- a/src/Azure/Orleans.Streaming.EventHubs/Providers/Streams/EventHub/EventHubQueueCacheFactory.cs +++ b/src/Azure/Orleans.Streaming.EventHubs/Providers/Streams/EventHub/EventHubQueueCacheFactory.cs @@ -2,6 +2,7 @@ using Microsoft.Extensions.Logging; using Orleans.Configuration; using Orleans.Providers.Streams.Common; +using Orleans.Runtime; using Orleans.Streams; using Orleans.Streaming.EventHubs.StatisticMonitors; @@ -19,6 +20,7 @@ public class EventHubQueueCacheFactory : IEventHubQueueCacheFactory private readonly IEventHubDataAdapter dataAdater; private readonly TimePurgePredicate timePurge; private readonly EventHubMonitorAggregationDimensions sharedDimensions; + private readonly OrleansInstruments orleansInstruments; private IObjectPool bufferPool; private string bufferPoolId; @@ -45,6 +47,19 @@ public EventHubQueueCacheFactory( EventHubMonitorAggregationDimensions sharedDimensions, Func cacheMonitorFactory = null, Func blockPoolMonitorFactory = null) + : this(cacheOptions, evictionOptions, statisticOptions, dataAdater, sharedDimensions, null, cacheMonitorFactory, blockPoolMonitorFactory) + { + } + + internal EventHubQueueCacheFactory( + EventHubStreamCachePressureOptions cacheOptions, + StreamCacheEvictionOptions evictionOptions, + StreamStatisticOptions statisticOptions, + IEventHubDataAdapter dataAdater, + EventHubMonitorAggregationDimensions sharedDimensions, + OrleansInstruments instruments, + Func cacheMonitorFactory = null, + Func blockPoolMonitorFactory = null) { this.cacheOptions = cacheOptions; this.evictionOptions = evictionOptions; @@ -52,10 +67,17 @@ public EventHubQueueCacheFactory( this.dataAdater = dataAdater; this.timePurge = new TimePurgePredicate(evictionOptions.DataMinTimeInCache, evictionOptions.DataMaxAgeInCache); this.sharedDimensions = sharedDimensions; - this.CacheMonitorFactory = cacheMonitorFactory ?? ((dimensions, logger) => new DefaultEventHubCacheMonitor(dimensions)); - this.BlockPoolMonitorFactory = blockPoolMonitorFactory ?? ((dimensions, logger) => new DefaultEventHubBlockPoolMonitor(dimensions)); + this.orleansInstruments = instruments; + this.CacheMonitorFactory = cacheMonitorFactory ?? CreateDefaultCacheMonitor; + this.BlockPoolMonitorFactory = blockPoolMonitorFactory ?? CreateDefaultBlockPoolMonitor; } + private ICacheMonitor CreateDefaultCacheMonitor(EventHubCacheMonitorDimensions dimensions, ILoggerFactory logger) => + this.orleansInstruments is not null ? new DefaultEventHubCacheMonitor(dimensions, this.orleansInstruments) : new DefaultEventHubCacheMonitor(dimensions); + + private IBlockPoolMonitor CreateDefaultBlockPoolMonitor(EventHubBlockPoolMonitorDimensions dimensions, ILoggerFactory logger) => + this.orleansInstruments is not null ? new DefaultEventHubBlockPoolMonitor(dimensions, this.orleansInstruments) : new DefaultEventHubBlockPoolMonitor(dimensions); + /// /// Function which create an EventHubQueueCache, which by default will configure the EventHubQueueCache using configuration in CreateBufferPool function /// and AddCachePressureMonitors function. diff --git a/src/Azure/Orleans.Streaming.EventHubs/Providers/Streams/EventHub/StatisticMonitors/DefaultEventHubBlockPoolMonitor.cs b/src/Azure/Orleans.Streaming.EventHubs/Providers/Streams/EventHub/StatisticMonitors/DefaultEventHubBlockPoolMonitor.cs index e1125782bff..cdd60925bad 100644 --- a/src/Azure/Orleans.Streaming.EventHubs/Providers/Streams/EventHub/StatisticMonitors/DefaultEventHubBlockPoolMonitor.cs +++ b/src/Azure/Orleans.Streaming.EventHubs/Providers/Streams/EventHub/StatisticMonitors/DefaultEventHubBlockPoolMonitor.cs @@ -1,5 +1,6 @@ using System.Collections.Generic; using Orleans.Providers.Streams.Common; +using Orleans.Runtime; namespace Orleans.Streaming.EventHubs.StatisticMonitors { @@ -15,5 +16,10 @@ public class DefaultEventHubBlockPoolMonitor : DefaultBlockPoolMonitor public DefaultEventHubBlockPoolMonitor(EventHubBlockPoolMonitorDimensions dimensions) : base(new KeyValuePair[] { new("Path", dimensions.EventHubPath), new("ObjectPoolId", dimensions.BlockPoolId) }) { } + + internal DefaultEventHubBlockPoolMonitor(EventHubBlockPoolMonitorDimensions dimensions, OrleansInstruments instruments) + : base(new KeyValuePair[] { new("Path", dimensions.EventHubPath), new("ObjectPoolId", dimensions.BlockPoolId) }, instruments) + { + } } } diff --git a/src/Azure/Orleans.Streaming.EventHubs/Providers/Streams/EventHub/StatisticMonitors/DefaultEventHubCacheMonitor.cs b/src/Azure/Orleans.Streaming.EventHubs/Providers/Streams/EventHub/StatisticMonitors/DefaultEventHubCacheMonitor.cs index 8d8d6e5076f..367348d8f1d 100644 --- a/src/Azure/Orleans.Streaming.EventHubs/Providers/Streams/EventHub/StatisticMonitors/DefaultEventHubCacheMonitor.cs +++ b/src/Azure/Orleans.Streaming.EventHubs/Providers/Streams/EventHub/StatisticMonitors/DefaultEventHubCacheMonitor.cs @@ -1,5 +1,6 @@ using System.Collections.Generic; using Orleans.Providers.Streams.Common; +using Orleans.Runtime; namespace Orleans.Streaming.EventHubs.StatisticMonitors { @@ -16,5 +17,10 @@ public DefaultEventHubCacheMonitor(EventHubCacheMonitorDimensions dimensions) : base(new KeyValuePair[] { new("Path", dimensions.EventHubPath), new("Partition", dimensions.EventHubPartition) }) { } + + internal DefaultEventHubCacheMonitor(EventHubCacheMonitorDimensions dimensions, OrleansInstruments instruments) + : base(new KeyValuePair[] { new("Path", dimensions.EventHubPath), new("Partition", dimensions.EventHubPartition) }, instruments) + { + } } } diff --git a/src/Orleans.Streaming/Common/Monitors/DefaultBlockPoolMonitor.cs b/src/Orleans.Streaming/Common/Monitors/DefaultBlockPoolMonitor.cs index cd5720370db..cae31714fb6 100644 --- a/src/Orleans.Streaming/Common/Monitors/DefaultBlockPoolMonitor.cs +++ b/src/Orleans.Streaming/Common/Monitors/DefaultBlockPoolMonitor.cs @@ -31,6 +31,11 @@ protected DefaultBlockPoolMonitor(KeyValuePair[] dimensions) { } + protected DefaultBlockPoolMonitor(KeyValuePair[] dimensions, OrleansInstruments instruments) + : this(dimensions, instruments.Meter) + { + } + internal DefaultBlockPoolMonitor(BlockPoolMonitorDimensions dimensions, OrleansInstruments instruments) : this(new KeyValuePair[] { new("BlockPoolId", dimensions.BlockPoolId) }, instruments.Meter) { diff --git a/src/Orleans.Streaming/Common/Monitors/DefaultCacheMonitor.cs b/src/Orleans.Streaming/Common/Monitors/DefaultCacheMonitor.cs index 5ca17181b4c..1c6f13086c1 100644 --- a/src/Orleans.Streaming/Common/Monitors/DefaultCacheMonitor.cs +++ b/src/Orleans.Streaming/Common/Monitors/DefaultCacheMonitor.cs @@ -43,6 +43,11 @@ protected DefaultCacheMonitor(KeyValuePair[] dimensions) { } + protected DefaultCacheMonitor(KeyValuePair[] dimensions, OrleansInstruments instruments) + : this(dimensions, instruments.Meter) + { + } + internal DefaultCacheMonitor(CacheMonitorDimensions dimensions, OrleansInstruments instruments) : this(new KeyValuePair[] { new("QueueId", dimensions.QueueId) }, instruments.Meter) { diff --git a/src/api/Orleans.Streaming/Orleans.Streaming.cs b/src/api/Orleans.Streaming/Orleans.Streaming.cs index d12fcd04855..0fafb3b8f4c 100644 --- a/src/api/Orleans.Streaming/Orleans.Streaming.cs +++ b/src/api/Orleans.Streaming/Orleans.Streaming.cs @@ -553,6 +553,8 @@ public DefaultBlockPoolMonitor(BlockPoolMonitorDimensions dimensions) { } protected DefaultBlockPoolMonitor(System.Collections.Generic.KeyValuePair[] dimensions) { } + protected DefaultBlockPoolMonitor(System.Collections.Generic.KeyValuePair[] dimensions, Runtime.OrleansInstruments instruments) { } + public void Report(long totalMemoryInByte, long availableMemoryInByte, long claimedMemoryInByte) { } public void TrackMemoryAllocated(long allocatedMemoryInByte) { } @@ -566,6 +568,8 @@ public DefaultCacheMonitor(CacheMonitorDimensions dimensions) { } protected DefaultCacheMonitor(System.Collections.Generic.KeyValuePair[] dimensions) { } + protected DefaultCacheMonitor(System.Collections.Generic.KeyValuePair[] dimensions, Runtime.OrleansInstruments instruments) { } + public void ReportCacheSize(long totalCacheSizeInByte) { } public void ReportMessageStatistics(System.DateTime? oldestMessageEnqueueTimeUtc, System.DateTime? oldestMessageDequeueTimeUtc, System.DateTime? newestMessageEnqueueTimeUtc, long totalMessageCount) { }