Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 177 additions & 0 deletions PowerKit.Tests/ObservableTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
using System;
using System.Collections.Generic;
using System.Threading;
using FluentAssertions;
using PowerKit;
using Xunit;

namespace PowerKit.Tests;

file class FakeObserver<T>(
Action<T>? onNext = null,
Action<Exception>? onError = null,
Action? onCompleted = null
) : IObserver<T>
{
public void OnNext(T value) => onNext?.Invoke(value);

public void OnError(Exception error) => onError?.Invoke(error);

public void OnCompleted() => onCompleted?.Invoke();
}

public class ObservableTests
{
[Fact]
public void Observable_Create_Subscribe_Test()
{
// Arrange
var subscribed = false;
var observable = Observable.Create<int>(_ =>
{
subscribed = true;
return Disposable.Null;
});

// Act
subscribed.Should().BeFalse();
observable.Subscribe(new FakeObserver<int>());

// Assert
subscribed.Should().BeTrue();
}

[Fact]
public void Observable_Create_OnNext_Test()
{
// Arrange
var received = new List<int>();
var observable = Observable.Create<int>(observer =>
{
observer.OnNext(1);
observer.OnNext(2);
observer.OnNext(3);
observer.OnCompleted();
return Disposable.Null;
});

// Act
observable.Subscribe(new FakeObserver<int>(onNext: received.Add));

// Assert
received.Should().Equal(1, 2, 3);
}

[Fact]
public void Observable_Create_OnError_Test()
{
// Arrange
var receivedError = default(Exception);
var observable = Observable.Create<int>(observer =>
{
observer.OnError(new InvalidOperationException("test error"));
return Disposable.Null;
});

// Act
observable.Subscribe(new FakeObserver<int>(onError: ex => receivedError = ex));

// Assert
receivedError.Should().BeOfType<InvalidOperationException>();
receivedError!.Message.Should().Be("test error");
}

[Fact]
public void Observable_Create_OnCompleted_Test()
{
// Arrange
var completed = false;
var observable = Observable.Create<int>(observer =>
{
observer.OnCompleted();
return Disposable.Null;
});

// Act
observable.Subscribe(new FakeObserver<int>(onCompleted: () => completed = true));

// Assert
completed.Should().BeTrue();
}

[Fact]
public void Observable_Create_Dispose_Test()
{
// Arrange
var disposed = false;
var observable = Observable.Create<int>(_ => Disposable.Create(() => disposed = true));

// Act
disposed.Should().BeFalse();
var subscription = observable.Subscribe(new FakeObserver<int>());
subscription.Dispose();

// Assert
disposed.Should().BeTrue();
}

[Fact]
public void Observable_CreateSynchronized_OnNext_Test()
{
// Arrange
var received = new List<int>();
var observable = Observable.CreateSynchronized<int>(observer =>
{
observer.OnNext(1);
observer.OnNext(2);
observer.OnNext(3);
observer.OnCompleted();
return Disposable.Null;
});

// Act
observable.Subscribe(new FakeObserver<int>(onNext: received.Add));

// Assert
received.Should().Equal(1, 2, 3);
}

[Fact]
public void Observable_CreateSynchronized_ThreadSafe_Test()
{
// Arrange
const int threadCount = 10;
const int valuesPerThread = 100;
var received = new List<int>();
var observable = Observable.CreateSynchronized<int>(observer =>
{
var threads = new List<Thread>();

for (var i = 0; i < threadCount; i++)
{
var thread = new Thread(() =>
{
for (var j = 0; j < valuesPerThread; j++)
{
observer.OnNext(j);
}
});
threads.Add(thread);
}

foreach (var t in threads)
t.Start();
foreach (var t in threads)
t.Join();

observer.OnCompleted();
return Disposable.Null;
});

// Act
observable.Subscribe(new FakeObserver<int>(onNext: v => received.Add(v)));

// Assert
received.Should().HaveCount(threadCount * valuesPerThread);
}
}
41 changes: 41 additions & 0 deletions PowerKit/Observable.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#nullable enable
using System;
using System.Diagnostics.CodeAnalysis;

namespace PowerKit;

#if NET40_OR_GREATER || NETSTANDARD || NET
/// <summary>
/// Represents an observable sequence of values.
/// </summary>
#if !POWERKIT_INCLUDE_COVERAGE
[ExcludeFromCodeCoverage]
#endif
file class Observable<T>(Func<IObserver<T>, IDisposable> subscribe) : IObservable<T>
{
/// <inheritdoc />
public IDisposable Subscribe(IObserver<T> observer) => subscribe(observer);
}

/// <summary>
/// Provides utility methods for creating <see cref="IObservable{T}" /> instances.
/// </summary>
#if !POWERKIT_INCLUDE_COVERAGE
[ExcludeFromCodeCoverage]
#endif
internal static class Observable
{
/// <summary>
/// Creates an observable that invokes the specified subscribe function when subscribed to.
/// </summary>
public static IObservable<T> Create<T>(Func<IObserver<T>, IDisposable> subscribe) =>
new Observable<T>(subscribe);

/// <summary>
/// Creates an observable that invokes the specified subscribe function when subscribed to,
/// wrapping the observer in a <see cref="SynchronizedObserver{T}" /> to ensure thread safety.
/// </summary>
public static IObservable<T> CreateSynchronized<T>(Func<IObserver<T>, IDisposable> subscribe) =>
Create<T>(observer => subscribe(new SynchronizedObserver<T>(observer)));
}
#endif
46 changes: 46 additions & 0 deletions PowerKit/SynchronizedObserver.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#nullable enable
using System;
using System.Diagnostics.CodeAnalysis;
using System.Threading;

namespace PowerKit;

#if NET40_OR_GREATER || NETSTANDARD || NET
/// <summary>
/// An observer that synchronizes access to the underlying observer.
/// </summary>
#if !POWERKIT_INCLUDE_COVERAGE
[ExcludeFromCodeCoverage]
#endif
internal class SynchronizedObserver<T>(IObserver<T> observer) : IObserver<T>
{
private readonly Lock _lock = new();

/// <inheritdoc />
public void OnCompleted()
{
lock (_lock)
{
observer.OnCompleted();
}
}

/// <inheritdoc />
public void OnError(Exception error)
{
lock (_lock)
{
observer.OnError(error);
}
}

/// <inheritdoc />
public void OnNext(T value)
{
lock (_lock)
{
observer.OnNext(value);
}
}
}
#endif