Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ protected virtual IEventHubQueueCacheFactory CreateCacheFactory(EventHubStreamCa
{
var eventHubPath = this.ehOptions.EventHubName;
var sharedDimensions = new EventHubMonitorAggregationDimensions(eventHubPath);
return new EventHubQueueCacheFactory(eventHubCacheOptions, cacheEvictionOptions, statisticOptions, this.dataAdapter, sharedDimensions);
return new EventHubQueueCacheFactory(eventHubCacheOptions, cacheEvictionOptions, statisticOptions, this.dataAdapter, sharedDimensions, this.orleansInstruments);
}

private EventHubAdapterReceiver MakeReceiver(QueueId queueId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using Microsoft.Extensions.Logging;
using Orleans.Configuration;
using Orleans.Providers.Streams.Common;
using Orleans.Runtime;
using Orleans.Streams;
using Orleans.Streaming.EventHubs.StatisticMonitors;

Expand All @@ -19,6 +20,7 @@ public class EventHubQueueCacheFactory : IEventHubQueueCacheFactory
private readonly IEventHubDataAdapter dataAdater;
private readonly TimePurgePredicate timePurge;
private readonly EventHubMonitorAggregationDimensions sharedDimensions;
private readonly OrleansInstruments orleansInstruments;
private IObjectPool<FixedSizeBuffer> bufferPool;
private string bufferPoolId;

Expand All @@ -45,17 +47,37 @@ public EventHubQueueCacheFactory(
EventHubMonitorAggregationDimensions sharedDimensions,
Func<EventHubCacheMonitorDimensions, ILoggerFactory, ICacheMonitor> cacheMonitorFactory = null,
Func<EventHubBlockPoolMonitorDimensions, ILoggerFactory, IBlockPoolMonitor> blockPoolMonitorFactory = null)
: this(cacheOptions, evictionOptions, statisticOptions, dataAdater, sharedDimensions, null, cacheMonitorFactory, blockPoolMonitorFactory)
{
}

internal EventHubQueueCacheFactory(
EventHubStreamCachePressureOptions cacheOptions,
StreamCacheEvictionOptions evictionOptions,
StreamStatisticOptions statisticOptions,
IEventHubDataAdapter dataAdater,
EventHubMonitorAggregationDimensions sharedDimensions,
OrleansInstruments instruments,
Func<EventHubCacheMonitorDimensions, ILoggerFactory, ICacheMonitor> cacheMonitorFactory = null,
Func<EventHubBlockPoolMonitorDimensions, ILoggerFactory, IBlockPoolMonitor> blockPoolMonitorFactory = null)
{
this.cacheOptions = cacheOptions;
this.evictionOptions = evictionOptions;
this.statisticOptions = statisticOptions;
this.dataAdater = dataAdater;
this.timePurge = new TimePurgePredicate(evictionOptions.DataMinTimeInCache, evictionOptions.DataMaxAgeInCache);
this.sharedDimensions = sharedDimensions;
this.CacheMonitorFactory = cacheMonitorFactory ?? ((dimensions, logger) => new DefaultEventHubCacheMonitor(dimensions));
this.BlockPoolMonitorFactory = blockPoolMonitorFactory ?? ((dimensions, logger) => new DefaultEventHubBlockPoolMonitor(dimensions));
this.orleansInstruments = instruments;
this.CacheMonitorFactory = cacheMonitorFactory ?? CreateDefaultCacheMonitor;
this.BlockPoolMonitorFactory = blockPoolMonitorFactory ?? CreateDefaultBlockPoolMonitor;
}

private ICacheMonitor CreateDefaultCacheMonitor(EventHubCacheMonitorDimensions dimensions, ILoggerFactory logger) =>
this.orleansInstruments is not null ? new DefaultEventHubCacheMonitor(dimensions, this.orleansInstruments) : new DefaultEventHubCacheMonitor(dimensions);

private IBlockPoolMonitor CreateDefaultBlockPoolMonitor(EventHubBlockPoolMonitorDimensions dimensions, ILoggerFactory logger) =>
this.orleansInstruments is not null ? new DefaultEventHubBlockPoolMonitor(dimensions, this.orleansInstruments) : new DefaultEventHubBlockPoolMonitor(dimensions);

/// <summary>
/// Function which create an EventHubQueueCache, which by default will configure the EventHubQueueCache using configuration in CreateBufferPool function
/// and AddCachePressureMonitors function.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Collections.Generic;
using Orleans.Providers.Streams.Common;
using Orleans.Runtime;

namespace Orleans.Streaming.EventHubs.StatisticMonitors
{
Expand All @@ -15,5 +16,10 @@ public class DefaultEventHubBlockPoolMonitor : DefaultBlockPoolMonitor
public DefaultEventHubBlockPoolMonitor(EventHubBlockPoolMonitorDimensions dimensions) : base(new KeyValuePair<string, object>[] { new("Path", dimensions.EventHubPath), new("ObjectPoolId", dimensions.BlockPoolId) })
{
}

internal DefaultEventHubBlockPoolMonitor(EventHubBlockPoolMonitorDimensions dimensions, OrleansInstruments instruments)
: base(new KeyValuePair<string, object>[] { new("Path", dimensions.EventHubPath), new("ObjectPoolId", dimensions.BlockPoolId) }, instruments)
{
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Collections.Generic;
using Orleans.Providers.Streams.Common;
using Orleans.Runtime;

namespace Orleans.Streaming.EventHubs.StatisticMonitors
{
Expand All @@ -16,5 +17,10 @@ public DefaultEventHubCacheMonitor(EventHubCacheMonitorDimensions dimensions)
: base(new KeyValuePair<string, object>[] { new("Path", dimensions.EventHubPath), new("Partition", dimensions.EventHubPartition) })
{
}

internal DefaultEventHubCacheMonitor(EventHubCacheMonitorDimensions dimensions, OrleansInstruments instruments)
: base(new KeyValuePair<string, object>[] { new("Path", dimensions.EventHubPath), new("Partition", dimensions.EventHubPartition) }, instruments)
{
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ protected DefaultBlockPoolMonitor(KeyValuePair<string, object>[] dimensions)
{
}

protected DefaultBlockPoolMonitor(KeyValuePair<string, object>[] dimensions, OrleansInstruments instruments)
: this(dimensions, instruments.Meter)
{
}

internal DefaultBlockPoolMonitor(BlockPoolMonitorDimensions dimensions, OrleansInstruments instruments)
: this(new KeyValuePair<string, object>[] { new("BlockPoolId", dimensions.BlockPoolId) }, instruments.Meter)
{
Expand Down
5 changes: 5 additions & 0 deletions src/Orleans.Streaming/Common/Monitors/DefaultCacheMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ protected DefaultCacheMonitor(KeyValuePair<string, object>[] dimensions)
{
}

protected DefaultCacheMonitor(KeyValuePair<string, object>[] dimensions, OrleansInstruments instruments)
: this(dimensions, instruments.Meter)
{
}

internal DefaultCacheMonitor(CacheMonitorDimensions dimensions, OrleansInstruments instruments)
: this(new KeyValuePair<string, object>[] { new("QueueId", dimensions.QueueId) }, instruments.Meter)
{
Expand Down
4 changes: 4 additions & 0 deletions src/api/Orleans.Streaming/Orleans.Streaming.cs
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,8 @@ public DefaultBlockPoolMonitor(BlockPoolMonitorDimensions dimensions) { }

protected DefaultBlockPoolMonitor(System.Collections.Generic.KeyValuePair<string, object>[] dimensions) { }

protected DefaultBlockPoolMonitor(System.Collections.Generic.KeyValuePair<string, object>[] dimensions, Runtime.OrleansInstruments instruments) { }

public void Report(long totalMemoryInByte, long availableMemoryInByte, long claimedMemoryInByte) { }

public void TrackMemoryAllocated(long allocatedMemoryInByte) { }
Expand All @@ -566,6 +568,8 @@ public DefaultCacheMonitor(CacheMonitorDimensions dimensions) { }

protected DefaultCacheMonitor(System.Collections.Generic.KeyValuePair<string, object>[] dimensions) { }

protected DefaultCacheMonitor(System.Collections.Generic.KeyValuePair<string, object>[] dimensions, Runtime.OrleansInstruments instruments) { }

public void ReportCacheSize(long totalCacheSizeInByte) { }

public void ReportMessageStatistics(System.DateTime? oldestMessageEnqueueTimeUtc, System.DateTime? oldestMessageDequeueTimeUtc, System.DateTime? newestMessageEnqueueTimeUtc, long totalMessageCount) { }
Expand Down
Loading