Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 139 additions & 0 deletions PowerKit.Tests/ResizableSemaphoreTests.cs
Comment thread
Tyrrrz marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using PowerKit;
using Xunit;

namespace PowerKit.Tests;

public class ResizableSemaphoreTests
{
[Fact]
public async Task AcquireAsync_WithinMaxCount_Test()
{
// Arrange
using var semaphore = new ResizableSemaphore { MaxCount = 2 };

// Act
using var access1 = await semaphore.AcquireAsync();
using var access2 = await semaphore.AcquireAsync();

// Assert
access1.Should().NotBeNull();
access2.Should().NotBeNull();
}

[Fact]
public async Task AcquireAsync_BlocksWhenMaxCountReached_Test()
{
// Arrange
using var semaphore = new ResizableSemaphore { MaxCount = 1 };
using var access1 = await semaphore.AcquireAsync();

// Act
var acquireTask = semaphore.AcquireAsync();

// Assert
acquireTask.IsCompleted.Should().BeFalse();

// Release and let the second acquire complete
access1.Dispose();
using var access2 = await acquireTask;
access2.Should().NotBeNull();
}

[Fact]
public async Task AcquireAsync_CancellationToken_Test()
{
// Arrange
using var semaphore = new ResizableSemaphore { MaxCount = 1 };
using var access = await semaphore.AcquireAsync();
using var cts = new CancellationTokenSource();

// Act
var acquireTask = semaphore.AcquireAsync(cts.Token);
cts.Cancel();

// Assert
await acquireTask.Awaiting(t => t).Should().ThrowAsync<OperationCanceledException>();
}

Comment thread
Tyrrrz marked this conversation as resolved.
[Fact]
public async Task AcquireAsync_Dispose_CancelsWaiters_Test()
{
// Arrange
using var semaphore = new ResizableSemaphore { MaxCount = 1 };
using var access = await semaphore.AcquireAsync();

// Act
var acquireTask = semaphore.AcquireAsync();
semaphore.Dispose();

// Assert
await acquireTask.Awaiting(t => t).Should().ThrowAsync<OperationCanceledException>();
}

[Fact]
public async Task AcquireAsync_AfterDispose_Throws_Test()
{
// Arrange
using var semaphore = new ResizableSemaphore();
semaphore.Dispose();

// Act & assert
await semaphore
.Awaiting(s => s.AcquireAsync())
.Should()
.ThrowAsync<ObjectDisposedException>();
}

[Fact]
public async Task MaxCount_IncreasedUnblocksWaiters_Test()
{
// Arrange
using var semaphore = new ResizableSemaphore { MaxCount = 1 };
using var access1 = await semaphore.AcquireAsync();

// Act
var acquireTask = semaphore.AcquireAsync();
semaphore.MaxCount = 2;

// Assert
using var access2 = await acquireTask;
access2.Should().NotBeNull();
}

[Fact]
public async Task Release_AllowsNextWaiter_Test()
{
// Arrange
using var semaphore = new ResizableSemaphore { MaxCount = 1 };

// Act
var access1 = await semaphore.AcquireAsync();
var acquireTask = semaphore.AcquireAsync();

access1.Dispose();

// Assert
using var access2 = await acquireTask;
access2.Should().NotBeNull();
}

[Fact]
public async Task Release_DoubleDispose_OnlyReleasesOnce_Test()
{
// Arrange
using var semaphore = new ResizableSemaphore { MaxCount = 1 };
var access = await semaphore.AcquireAsync();

// Act
access.Dispose();
access.Dispose();

// Assert: should still be able to acquire once (count wasn't double-decremented)
using var access2 = await semaphore.AcquireAsync();
access2.Should().NotBeNull();
}
}
129 changes: 129 additions & 0 deletions PowerKit/ResizableSemaphore.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
#if NET40_OR_GREATER || NETSTANDARD || NET
#nullable enable
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Threading;
using System.Threading.Tasks;

namespace PowerKit;

#if !POWERKIT_INCLUDE_COVERAGE
[ExcludeFromCodeCoverage]
#endif
file class ResizableSemaphoreAccess(ResizableSemaphore semaphore) : IDisposable
{
private bool _isDisposed;

public void Dispose()
{
if (!_isDisposed)
{
semaphore.Release();
}

_isDisposed = true;
Comment thread
Tyrrrz marked this conversation as resolved.
Outdated
}
}
Comment thread
Tyrrrz marked this conversation as resolved.
Outdated

/// <summary>
/// Like a regular semaphore, but the max count can be changed at any point.
Comment thread
Tyrrrz marked this conversation as resolved.
Outdated
/// </summary>
#if !POWERKIT_INCLUDE_COVERAGE
[ExcludeFromCodeCoverage]
#endif
internal class ResizableSemaphore : IDisposable
{
private readonly Lock _lock = new();
private readonly Queue<TaskCompletionSource> _waiters = new();
private readonly CancellationTokenSource _cts = new();

private bool _isDisposed;
private int _maxCount = int.MaxValue;
Comment thread
Tyrrrz marked this conversation as resolved.
Outdated
private int _count;

/// <summary>
/// Gets or sets the maximum number of concurrent accesses.
/// Defaults to <see cref="int.MaxValue" />.
/// </summary>
public int MaxCount
{
get
{
using (_lock.EnterScope())
{
return _maxCount;
}
}
set
{
using (_lock.EnterScope())
{
_maxCount = value;
Refresh();
}
}
Comment thread
Tyrrrz marked this conversation as resolved.
}

// Must be called while holding the lock.
Comment thread
Tyrrrz marked this conversation as resolved.
Outdated
private void Refresh()
{
// Provide access to pending waiters, as long as max count allows.
while (_count < _maxCount && _waiters.TryDequeue(out var waiter))
{
// Don't increment the count if the waiter has already been
// completed before (most likely by getting canceled).
if (waiter!.TrySetResult())
Comment thread
Tyrrrz marked this conversation as resolved.
Outdated
_count++;
}
}

/// <summary>
/// Acquires access to the semaphore, waiting asynchronously if the max count has been reached.
/// Dispose the returned handle to release access.
/// </summary>
public async Task<IDisposable> AcquireAsync(CancellationToken cancellationToken = default)
{
if (_isDisposed)
throw new ObjectDisposedException(GetType().Name);
Comment thread
Tyrrrz marked this conversation as resolved.
Outdated

var waiter = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);

using var ctsRegistration = _cts.Token.Register(() => waiter.TrySetCanceled(_cts.Token));
using var ctRegistration = cancellationToken.Register(() =>
waiter.TrySetCanceled(cancellationToken)
);

using (_lock.EnterScope())
{
_waiters.Enqueue(waiter);
Refresh();
}
Comment thread
Tyrrrz marked this conversation as resolved.

await waiter.Task.ConfigureAwait(false);

return new ResizableSemaphoreAccess(this);
}

internal void Release()
{
using (_lock.EnterScope())
{
_count--;
Refresh();
}
}

/// <inheritdoc />
public void Dispose()
{
if (!_isDisposed)
{
_cts.Cancel();
_cts.Dispose();
}

_isDisposed = true;
}
}
#endif