diff --git a/PowerKit.Tests/ObservableTests.cs b/PowerKit.Tests/ObservableTests.cs new file mode 100644 index 0000000..48564b9 --- /dev/null +++ b/PowerKit.Tests/ObservableTests.cs @@ -0,0 +1,177 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using FluentAssertions; +using PowerKit; +using Xunit; + +namespace PowerKit.Tests; + +file class FakeObserver( + Action? onNext = null, + Action? onError = null, + Action? onCompleted = null +) : IObserver +{ + public void OnNext(T value) => onNext?.Invoke(value); + + public void OnError(Exception error) => onError?.Invoke(error); + + public void OnCompleted() => onCompleted?.Invoke(); +} + +public class ObservableTests +{ + [Fact] + public void Observable_Create_Subscribe_Test() + { + // Arrange + var subscribed = false; + var observable = Observable.Create(_ => + { + subscribed = true; + return Disposable.Null; + }); + + // Act + subscribed.Should().BeFalse(); + observable.Subscribe(new FakeObserver()); + + // Assert + subscribed.Should().BeTrue(); + } + + [Fact] + public void Observable_Create_OnNext_Test() + { + // Arrange + var received = new List(); + var observable = Observable.Create(observer => + { + observer.OnNext(1); + observer.OnNext(2); + observer.OnNext(3); + observer.OnCompleted(); + return Disposable.Null; + }); + + // Act + observable.Subscribe(new FakeObserver(onNext: received.Add)); + + // Assert + received.Should().Equal(1, 2, 3); + } + + [Fact] + public void Observable_Create_OnError_Test() + { + // Arrange + var receivedError = default(Exception); + var observable = Observable.Create(observer => + { + observer.OnError(new InvalidOperationException("test error")); + return Disposable.Null; + }); + + // Act + observable.Subscribe(new FakeObserver(onError: ex => receivedError = ex)); + + // Assert + receivedError.Should().BeOfType(); + receivedError!.Message.Should().Be("test error"); + } + + [Fact] + public void Observable_Create_OnCompleted_Test() + { + // Arrange + var completed = false; + var observable = Observable.Create(observer => + { + observer.OnCompleted(); + return Disposable.Null; + }); + + // Act + observable.Subscribe(new FakeObserver(onCompleted: () => completed = true)); + + // Assert + completed.Should().BeTrue(); + } + + [Fact] + public void Observable_Create_Dispose_Test() + { + // Arrange + var disposed = false; + var observable = Observable.Create(_ => Disposable.Create(() => disposed = true)); + + // Act + disposed.Should().BeFalse(); + var subscription = observable.Subscribe(new FakeObserver()); + subscription.Dispose(); + + // Assert + disposed.Should().BeTrue(); + } + + [Fact] + public void Observable_CreateSynchronized_OnNext_Test() + { + // Arrange + var received = new List(); + var observable = Observable.CreateSynchronized(observer => + { + observer.OnNext(1); + observer.OnNext(2); + observer.OnNext(3); + observer.OnCompleted(); + return Disposable.Null; + }); + + // Act + observable.Subscribe(new FakeObserver(onNext: received.Add)); + + // Assert + received.Should().Equal(1, 2, 3); + } + + [Fact] + public void Observable_CreateSynchronized_ThreadSafe_Test() + { + // Arrange + const int threadCount = 10; + const int valuesPerThread = 100; + var received = new List(); + var observable = Observable.CreateSynchronized(observer => + { + var threads = new List(); + + for (var i = 0; i < threadCount; i++) + { + var thread = new Thread(() => + { + for (var j = 0; j < valuesPerThread; j++) + { + observer.OnNext(j); + } + }); + threads.Add(thread); + } + + foreach (var t in threads) + t.Start(); + foreach (var t in threads) + t.Join(); + + observer.OnCompleted(); + return Disposable.Null; + }); + + // Act + observable.Subscribe(new FakeObserver(onNext: v => received.Add(v))); + + // Assert + received.Should().HaveCount(threadCount * valuesPerThread); + } +} diff --git a/PowerKit/Observable.cs b/PowerKit/Observable.cs new file mode 100644 index 0000000..a483aae --- /dev/null +++ b/PowerKit/Observable.cs @@ -0,0 +1,41 @@ +#nullable enable +using System; +using System.Diagnostics.CodeAnalysis; + +namespace PowerKit; + +#if NET40_OR_GREATER || NETSTANDARD || NET +/// +/// Represents an observable sequence of values. +/// +#if !POWERKIT_INCLUDE_COVERAGE +[ExcludeFromCodeCoverage] +#endif +file class Observable(Func, IDisposable> subscribe) : IObservable +{ + /// + public IDisposable Subscribe(IObserver observer) => subscribe(observer); +} + +/// +/// Provides utility methods for creating instances. +/// +#if !POWERKIT_INCLUDE_COVERAGE +[ExcludeFromCodeCoverage] +#endif +internal static class Observable +{ + /// + /// Creates an observable that invokes the specified subscribe function when subscribed to. + /// + public static IObservable Create(Func, IDisposable> subscribe) => + new Observable(subscribe); + + /// + /// Creates an observable that invokes the specified subscribe function when subscribed to, + /// wrapping the observer in a to ensure thread safety. + /// + public static IObservable CreateSynchronized(Func, IDisposable> subscribe) => + Create(observer => subscribe(new SynchronizedObserver(observer))); +} +#endif diff --git a/PowerKit/SynchronizedObserver.cs b/PowerKit/SynchronizedObserver.cs new file mode 100644 index 0000000..5c18721 --- /dev/null +++ b/PowerKit/SynchronizedObserver.cs @@ -0,0 +1,46 @@ +#nullable enable +using System; +using System.Diagnostics.CodeAnalysis; +using System.Threading; + +namespace PowerKit; + +#if NET40_OR_GREATER || NETSTANDARD || NET +/// +/// An observer that synchronizes access to the underlying observer. +/// +#if !POWERKIT_INCLUDE_COVERAGE +[ExcludeFromCodeCoverage] +#endif +internal class SynchronizedObserver(IObserver observer) : IObserver +{ + private readonly Lock _lock = new(); + + /// + public void OnCompleted() + { + lock (_lock) + { + observer.OnCompleted(); + } + } + + /// + public void OnError(Exception error) + { + lock (_lock) + { + observer.OnError(error); + } + } + + /// + public void OnNext(T value) + { + lock (_lock) + { + observer.OnNext(value); + } + } +} +#endif