diff --git a/test/Dapr.Workflow.Abstractions.Test/WorkflowContextWaitForExternalEventTests.cs b/test/Dapr.Workflow.Abstractions.Test/WorkflowContextWaitForExternalEventTests.cs new file mode 100644 index 000000000..6d8d53b93 --- /dev/null +++ b/test/Dapr.Workflow.Abstractions.Test/WorkflowContextWaitForExternalEventTests.cs @@ -0,0 +1,127 @@ +// ------------------------------------------------------------------------ +// Copyright 2026 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Workflow.Abstractions.Test; + +public class WorkflowContextWaitForExternalEventTests +{ + private sealed class ExternalEventProbeContext : WorkflowContext + { + private readonly DateTime _now; + + public ExternalEventProbeContext(DateTime now) + { + _now = now; + } + + public TaskCompletionSource EventTcs { get; } = + new(TaskCreationOptions.RunContinuationsAsynchronously); + public TaskCompletionSource TimerTcs { get; } = + new(TaskCreationOptions.RunContinuationsAsynchronously); + + public DateTime? LastTimerFireAt { get; private set; } + public CancellationToken LastTimerToken { get; private set; } + public CancellationToken LastEventToken { get; private set; } + + public override string Name => "wf"; + public override string InstanceId => "id-1"; + public override DateTime CurrentUtcDateTime => _now; + public override bool IsReplaying => false; + public override bool IsPatched(string patchName) => false; + + public override Task WaitForExternalEventAsync(string eventName, CancellationToken cancellationToken = default) + { + LastEventToken = cancellationToken; + if (cancellationToken.CanBeCanceled) + { + cancellationToken.Register(() => EventTcs.TrySetCanceled(cancellationToken)); + } + + if (typeof(TEvent) != typeof(TEventPayload)) + { + throw new NotSupportedException($"Unsupported event type: {typeof(TEvent).FullName}"); + } + + return (Task)(object)EventTcs.Task; + } + + public override Task CreateTimer(DateTime fireAt, CancellationToken cancellationToken) + { + LastTimerFireAt = fireAt; + LastTimerToken = cancellationToken; + if (cancellationToken.CanBeCanceled) + { + cancellationToken.Register(() => TimerTcs.TrySetCanceled(cancellationToken)); + } + + return TimerTcs.Task; + } + + public override Task CallActivityAsync(string name, object? input = null, WorkflowTaskOptions? options = null) + => Task.FromResult(default(T)!); + public override void SendEvent(string instanceId, string eventName, object payload) { } + public override void SetCustomStatus(object? customStatus) { } + public override Task CallChildWorkflowAsync(string workflowName, object? input = null, ChildWorkflowTaskOptions? options = null) + => Task.FromResult(default(TResult)!); + public override Microsoft.Extensions.Logging.ILogger CreateReplaySafeLogger(string categoryName) => new NullLogger(); + public override Microsoft.Extensions.Logging.ILogger CreateReplaySafeLogger(Type type) => new NullLogger(); + public override Microsoft.Extensions.Logging.ILogger CreateReplaySafeLogger() => new NullLogger(); + public override void ContinueAsNew(object? newInput = null, bool preserveUnprocessedEvents = true) { } + public override Guid NewGuid() => Guid.Empty; + + private sealed class NullLogger : Microsoft.Extensions.Logging.ILogger + { + IDisposable? Microsoft.Extensions.Logging.ILogger.BeginScope(TState state) => null; + bool Microsoft.Extensions.Logging.ILogger.IsEnabled(Microsoft.Extensions.Logging.LogLevel logLevel) => false; + void Microsoft.Extensions.Logging.ILogger.Log(Microsoft.Extensions.Logging.LogLevel logLevel, Microsoft.Extensions.Logging.EventId eventId, TState state, Exception? exception, Func formatter) { } + } + } + + [Fact] + public async Task WaitForExternalEventAsync_WithTimeout_Returns_Event_When_Event_Wins() + { + var now = new DateTime(2025, 1, 1, 0, 0, 0, DateTimeKind.Utc); + var ctx = new ExternalEventProbeContext(now); + var timeout = TimeSpan.FromMinutes(1); + + Task task = ctx.WaitForExternalEventAsync("evt", timeout); + + Assert.Equal(now.Add(timeout), ctx.LastTimerFireAt); + + ctx.EventTcs.TrySetResult("payload"); + var result = await task; + + Assert.Equal("payload", result); + Assert.False(ctx.LastEventToken.IsCancellationRequested); + Assert.True(ctx.LastTimerToken.IsCancellationRequested); + Assert.True(ctx.TimerTcs.Task.IsCanceled); + } + + [Fact] + public async Task WaitForExternalEventAsync_WithTimeout_Throws_When_Timer_Wins() + { + var now = new DateTime(2025, 1, 1, 0, 0, 0, DateTimeKind.Utc); + var ctx = new ExternalEventProbeContext(now); + var timeout = TimeSpan.FromMinutes(1); + + Task task = ctx.WaitForExternalEventAsync("evt", timeout); + + ctx.TimerTcs.TrySetResult(null); + + await Assert.ThrowsAsync(() => task); + + Assert.True(ctx.LastEventToken.IsCancellationRequested); + Assert.True(ctx.EventTcs.Task.IsCanceled); + Assert.False(ctx.LastTimerToken.IsCancellationRequested); + } +} diff --git a/test/Dapr.Workflow.Abstractions.Test/WorkflowTaskFailedExceptionTests.cs b/test/Dapr.Workflow.Abstractions.Test/WorkflowTaskFailedExceptionTests.cs new file mode 100644 index 000000000..aee65fbef --- /dev/null +++ b/test/Dapr.Workflow.Abstractions.Test/WorkflowTaskFailedExceptionTests.cs @@ -0,0 +1,33 @@ +// ------------------------------------------------------------------------ +// Copyright 2026 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Workflow.Abstractions.Test; + +public class WorkflowTaskFailedExceptionTests +{ + [Fact] + public void Constructor_Sets_Message_And_FailureDetails() + { + var details = new WorkflowTaskFailureDetails(typeof(InvalidOperationException).FullName!, "boom"); + var ex = new WorkflowTaskFailedException("task failed", details); + + Assert.Equal("task failed", ex.Message); + Assert.Same(details, ex.FailureDetails); + } + + [Fact] + public void Constructor_With_Null_FailureDetails_Throws() + { + Assert.Throws(() => _ = new WorkflowTaskFailedException("task failed", null!)); + } +} diff --git a/test/Dapr.Workflow.Test/Client/WorkflowGrpcClientTests.cs b/test/Dapr.Workflow.Test/Client/WorkflowGrpcClientTests.cs index b3a759093..a701854fe 100644 --- a/test/Dapr.Workflow.Test/Client/WorkflowGrpcClientTests.cs +++ b/test/Dapr.Workflow.Test/Client/WorkflowGrpcClientTests.cs @@ -734,8 +734,8 @@ private sealed class StubSerializer : IWorkflowSerializer public string Serialize(object? value, Type? inputType = null) => value is null ? string.Empty : SerializeResult; - public T? Deserialize(string? data) => throw new NotSupportedException(); + public T Deserialize(string? data) => throw new NotSupportedException(); - public object? Deserialize(string? data, Type returnType) => throw new NotSupportedException(); + public object Deserialize(string? data, Type returnType) => throw new NotSupportedException(); } } diff --git a/test/Dapr.Workflow.Test/Client/WorkflowMetadataTests.cs b/test/Dapr.Workflow.Test/Client/WorkflowMetadataTests.cs index f6cacd523..478091842 100644 --- a/test/Dapr.Workflow.Test/Client/WorkflowMetadataTests.cs +++ b/test/Dapr.Workflow.Test/Client/WorkflowMetadataTests.cs @@ -152,6 +152,6 @@ private sealed class CapturingSerializer : IWorkflowSerializer return (T?)NextGenericResult; } - public object? Deserialize(string? data, Type returnType) => throw new NotSupportedException(); + public object Deserialize(string? data, Type returnType) => throw new NotSupportedException(); } } diff --git a/test/Dapr.Workflow.Test/DaprWorkflowClientBuilderTests.cs b/test/Dapr.Workflow.Test/DaprWorkflowClientBuilderTests.cs new file mode 100644 index 000000000..9d8f01167 --- /dev/null +++ b/test/Dapr.Workflow.Test/DaprWorkflowClientBuilderTests.cs @@ -0,0 +1,229 @@ +// ------------------------------------------------------------------------ +// Copyright 2025 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System.Reflection; +using System.Text.Json; +using Dapr.DurableTask.Protobuf; +using Dapr.Workflow.Client; +using Dapr.Workflow.Registration; +using Dapr.Workflow.Serialization; +using Grpc.Core; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using Moq; + +namespace Dapr.Workflow.Test; + +public class DaprWorkflowClientBuilderTests +{ + [Fact] + public void Build_ShouldUseServiceProviderGrpcClientLoggerAndSerializer() + { + var grpcClient = CreateGrpcClient(); + var logger = new TestLogger(); + + var services = new ServiceCollection(); + services.AddSingleton(grpcClient); + services.AddSingleton(new StubLoggerFactory(logger)); + + var provider = services.BuildServiceProvider(); + + var serializer = new TrackingSerializer("custom"); + var builder = new DaprWorkflowClientBuilder() + .UseServiceProvider(provider) + .UseSerializer(serializer); + + var client = builder.Build(); + + var inner = GetInnerClient(client); + var usedGrpcClient = GetPrivateField(inner); + var usedLogger = GetPrivateField>(inner); + var innerLogger = UnwrapLogger(usedLogger); + var usedSerializer = GetPrivateField(inner); + + Assert.Same(grpcClient, usedGrpcClient); + Assert.Same(logger, innerLogger); + Assert.Same(serializer, usedSerializer); + } + + [Fact] + public void Build_ShouldUseSerializerFactory_WhenConfigured() + { + var grpcClient = CreateGrpcClient(); + var dependency = new SerializerDependency("dep"); + + var services = new ServiceCollection(); + services.AddSingleton(grpcClient); + services.AddSingleton(dependency); + services.AddSingleton(NullLoggerFactory.Instance); + + var provider = services.BuildServiceProvider(); + + SerializerDependency? seenDependency = null; + DependencySerializer? createdSerializer = null; + + var builder = new DaprWorkflowClientBuilder() + .UseServiceProvider(provider) + .UseSerializer(sp => + { + seenDependency = sp.GetRequiredService(); + createdSerializer = new DependencySerializer(seenDependency); + return createdSerializer; + }); + + var client = builder.Build(); + var inner = GetInnerClient(client); + var usedSerializer = GetPrivateField(inner); + + Assert.Same(dependency, seenDependency); + Assert.Same(createdSerializer, usedSerializer); + } + + [Fact] + public void Build_ShouldUseDefaultJsonSerializer_WhenNoSerializerConfigured() + { + var builder = new DaprWorkflowClientBuilder(); + + var client = builder.Build(); + var inner = GetInnerClient(client); + var serializer = GetPrivateField(inner); + + var json = Assert.IsType(serializer); + var payload = json.Serialize(new { MyValue = 1 }); + + Assert.Contains("\"myValue\"", payload); + } + + [Fact] + public void Build_ShouldUseJsonSerializerOptions_WhenConfigured() + { + var options = new JsonSerializerOptions { PropertyNamingPolicy = null }; + var builder = new DaprWorkflowClientBuilder().UseJsonSerializer(options); + + var client = builder.Build(); + var inner = GetInnerClient(client); + var serializer = GetPrivateField(inner); + + var json = Assert.IsType(serializer); + var payload = json.Serialize(new { MyValue = 1 }); + + Assert.Contains("\"MyValue\"", payload); + } + + [Fact] + public void Build_ShouldFallbackToNewGrpcClient_WhenServiceProviderHasNoGrpcClient() + { + var services = new ServiceCollection(); + services.AddSingleton(NullLoggerFactory.Instance); + + var provider = services.BuildServiceProvider(); + + var builder = new DaprWorkflowClientBuilder().UseServiceProvider(provider); + + var client = builder.Build(); + var inner = GetInnerClient(client); + var grpcClient = GetPrivateField(inner); + + Assert.NotNull(grpcClient); + } + + private static TaskHubSidecarService.TaskHubSidecarServiceClient CreateGrpcClient() + { + var callInvoker = new Mock(MockBehavior.Loose); + return new TaskHubSidecarService.TaskHubSidecarServiceClient(callInvoker.Object); + } + + private static WorkflowClient GetInnerClient(DaprWorkflowClient client) + { + var field = typeof(DaprWorkflowClient).GetField("_innerClient", BindingFlags.NonPublic | BindingFlags.Instance); + Assert.NotNull(field); + return (WorkflowClient)field.GetValue(client)!; + } + + private static T GetPrivateField(object instance) + { + var field = instance + .GetType() + .GetFields(BindingFlags.NonPublic | BindingFlags.Instance) + .FirstOrDefault(f => typeof(T).IsAssignableFrom(f.FieldType)); + + Assert.NotNull(field); + var value = field.GetValue(instance); + Assert.NotNull(value); + return (T)value; + } + + private static ILogger UnwrapLogger(ILogger logger) + { + var field = logger + .GetType() + .GetFields(BindingFlags.NonPublic | BindingFlags.Instance) + .FirstOrDefault(f => typeof(ILogger).IsAssignableFrom(f.FieldType)); + + return field?.GetValue(logger) as ILogger ?? logger; + } + + private sealed class TrackingSerializer(string name) : IWorkflowSerializer + { + private string Name { get; } = name; + + public string Serialize(object? value, Type? inputType = null) => Name; + public T? Deserialize(string? data) => default; + public object? Deserialize(string? data, Type returnType) => null; + } + + private sealed record SerializerDependency(string Value); + + private sealed class DependencySerializer(SerializerDependency dep) : IWorkflowSerializer + { + private SerializerDependency Dependency { get; } = dep; + + public string Serialize(object? value, Type? inputType = null) => Dependency.Value; + public T? Deserialize(string? data) => default; + public object? Deserialize(string? data, Type returnType) => null; + } + + private sealed class TestLogger : ILogger + { + public IDisposable BeginScope(TState state) where TState : notnull => NoopScope.Instance; + public bool IsEnabled(LogLevel logLevel) => true; + public void Log(LogLevel logLevel, EventId eventId, TState state, Exception? exception, + Func formatter) + { + } + } + + private sealed class StubLoggerFactory(ILogger logger) : ILoggerFactory + { + private ILogger Logger { get; } = logger; + + public void AddProvider(ILoggerProvider provider) + { + } + + public ILogger CreateLogger(string categoryName) => Logger; + + public void Dispose() + { + } + } + + private sealed class NoopScope : IDisposable + { + public static NoopScope Instance { get; } = new(); + public void Dispose() + { + } + } +} diff --git a/test/Dapr.Workflow.Test/DaprWorkflowClientTests.cs b/test/Dapr.Workflow.Test/DaprWorkflowClientTests.cs index 7f8264fa7..99cb14353 100644 --- a/test/Dapr.Workflow.Test/DaprWorkflowClientTests.cs +++ b/test/Dapr.Workflow.Test/DaprWorkflowClientTests.cs @@ -113,7 +113,7 @@ public async Task GetWorkflowStateAsync_ShouldReturnWorkflowState_WhenInnerRetur var state = await client.GetWorkflowStateAsync("i", cancellation: TestContext.Current.CancellationToken); Assert.NotNull(state); - Assert.True(state!.Exists); + Assert.True(state.Exists); Assert.True(state.IsWorkflowRunning); Assert.Equal(WorkflowRuntimeStatus.Running, state.RuntimeStatus); } @@ -158,6 +158,63 @@ public async Task WaitForWorkflowCompletionAsync_ShouldThrowArgumentException_Wh await Assert.ThrowsAsync(() => client.WaitForWorkflowCompletionAsync("", cancellation: TestContext.Current.CancellationToken)); } + [Fact] + public async Task WaitForWorkflowCompletionAsync_ShouldFetchInputsAndOutputs_WhenRequested() + { + var completionMetadata = new WorkflowMetadata( + InstanceId: "i", + Name: "wf", + RuntimeStatus: WorkflowRuntimeStatus.Completed, + CreatedAt: DateTime.MinValue, + LastUpdatedAt: DateTime.MinValue, + Serializer: new Serialization.JsonWorkflowSerializer()); + + var metadataWithInputs = new WorkflowMetadata( + InstanceId: "i", + Name: "wf", + RuntimeStatus: WorkflowRuntimeStatus.Completed, + CreatedAt: DateTime.MinValue, + LastUpdatedAt: DateTime.MinValue, + Serializer: new Serialization.JsonWorkflowSerializer()); + + var inner = new CapturingWorkflowClient + { + WaitForCompletionResult = completionMetadata, + GetWorkflowMetadataResult = metadataWithInputs + }; + var client = new DaprWorkflowClient(inner); + + var state = await client.WaitForWorkflowCompletionAsync("i", getInputsAndOutputs: true, cancellation: TestContext.Current.CancellationToken); + + Assert.True(state.Exists); + Assert.Equal("i", inner.LastWaitForCompletionInstanceId); + Assert.False(inner.LastWaitForCompletionGetInputsAndOutputs); + Assert.Equal("i", inner.LastGetMetadataInstanceId); + Assert.True(inner.LastGetMetadataGetInputsAndOutputs); + } + + [Fact] + public async Task WaitForWorkflowCompletionAsync_ShouldNotFetchInputsAndOutputs_WhenNotRequested() + { + var completionMetadata = new WorkflowMetadata( + InstanceId: "i", + Name: "wf", + RuntimeStatus: WorkflowRuntimeStatus.Completed, + CreatedAt: DateTime.MinValue, + LastUpdatedAt: DateTime.MinValue, + Serializer: new Serialization.JsonWorkflowSerializer()); + + var inner = new CapturingWorkflowClient { WaitForCompletionResult = completionMetadata }; + var client = new DaprWorkflowClient(inner); + + var state = await client.WaitForWorkflowCompletionAsync("i", getInputsAndOutputs: false, cancellation: TestContext.Current.CancellationToken); + + Assert.True(state.Exists); + Assert.Equal("i", inner.LastWaitForCompletionInstanceId); + Assert.False(inner.LastWaitForCompletionGetInputsAndOutputs); + Assert.Null(inner.LastGetMetadataInstanceId); + } + [Fact] public async Task RaiseEventAsync_ShouldValidateParameters_AndForwardToInner() { @@ -290,6 +347,9 @@ private sealed class CapturingWorkflowClient : WorkflowClient public WorkflowMetadata WaitForStartResult { get; set; } = new("i", "wf", WorkflowRuntimeStatus.Running, DateTime.MinValue, DateTime.MinValue, new Serialization.JsonWorkflowSerializer()); + public string? LastWaitForCompletionInstanceId { get; private set; } + public bool LastWaitForCompletionGetInputsAndOutputs { get; private set; } + public CancellationToken LastWaitForCompletionCancellationToken { get; private set; } public WorkflowMetadata WaitForCompletionResult { get; set; } = new("i", "wf", WorkflowRuntimeStatus.Completed, DateTime.MinValue, DateTime.MinValue, new Serialization.JsonWorkflowSerializer()); @@ -362,7 +422,12 @@ public override Task WaitForWorkflowCompletionAsync( string instanceId, bool getInputsAndOutputs = true, CancellationToken cancellationToken = default) - => Task.FromResult(WaitForCompletionResult); + { + LastWaitForCompletionInstanceId = instanceId; + LastWaitForCompletionGetInputsAndOutputs = getInputsAndOutputs; + LastWaitForCompletionCancellationToken = cancellationToken; + return Task.FromResult(WaitForCompletionResult); + } public override Task RaiseEventAsync( string instanceId, diff --git a/test/Dapr.Workflow.Test/ParallelExtensionsTest.cs b/test/Dapr.Workflow.Test/ParallelExtensionsTest.cs index 2bd2fdbf8..c2f7d40eb 100644 --- a/test/Dapr.Workflow.Test/ParallelExtensionsTest.cs +++ b/test/Dapr.Workflow.Test/ParallelExtensionsTest.cs @@ -164,8 +164,7 @@ await context.ProcessInParallelAsync( } return 0; - }, - maxConcurrency); + }); // Assert Assert.True(maxObservedConcurrency <= maxConcurrency, @@ -178,8 +177,8 @@ public async Task ProcessInParallelAsync_ShouldAggregateExceptions_WhenTasksFail { // Arrange var context = new FakeWorkflowContext(); - var inputs = new[] { 1, 2, 3, 4, 5 }; - var expectedMessage = "Test exception"; + int[] inputs = [1, 2, 3, 4, 5]; + const string expectedMessage = "Test exception"; // Act & Assert var ex = await Assert.ThrowsAsync(async () => @@ -205,11 +204,11 @@ public async Task ProcessInParallelAsync_ShouldAggregateExceptions_WhenFactoryTh { // Arrange var context = new FakeWorkflowContext(); - var inputs = new[] { 1, 2, 3 }; + int[] inputs = [1, 2, 3]; // Act & Assert var ex = await Assert.ThrowsAsync(async () => - await context.ProcessInParallelAsync( + await context.ProcessInParallelAsync( inputs, i => { @@ -251,18 +250,6 @@ public async Task ProcessInParallelAsync_WithInputCountGreaterThanMaxConcurrency Assert.Equal(count, processedCount); Assert.Equal(inputs, results); } - - private class TestInput - { - public int Id { get; set; } - public string Value { get; set; } = string.Empty; - } - - private class TestOutput - { - public int ProcessedId { get; set; } - public string ProcessedValue { get; set; } = string.Empty; - } private sealed class FakeWorkflowContext : WorkflowContext { diff --git a/test/Dapr.Workflow.Test/Serialziation/JsonWorkflowSerializerTests.cs b/test/Dapr.Workflow.Test/Serialziation/JsonWorkflowSerializerTests.cs index 88834fed4..cc70310e6 100644 --- a/test/Dapr.Workflow.Test/Serialziation/JsonWorkflowSerializerTests.cs +++ b/test/Dapr.Workflow.Test/Serialziation/JsonWorkflowSerializerTests.cs @@ -139,7 +139,7 @@ public void SerializeAndDeserialize_Generic_ShouldRoundTripObject() var roundTripped = serializer.Deserialize(json); Assert.NotNull(roundTripped); - Assert.Equal(original.Id, roundTripped!.Id); + Assert.Equal(original.Id, roundTripped.Id); Assert.Equal(original.Name, roundTripped.Name); Assert.NotNull(roundTripped.Nested); Assert.Equal(original.Nested.FirstName, roundTripped.Nested.FirstName); diff --git a/test/Dapr.Workflow.Test/Versioning/WorkflowVersionTrackerTests.cs b/test/Dapr.Workflow.Test/Versioning/WorkflowVersionTrackerTests.cs index 080b59467..08b37d07c 100644 --- a/test/Dapr.Workflow.Test/Versioning/WorkflowVersionTrackerTests.cs +++ b/test/Dapr.Workflow.Test/Versioning/WorkflowVersionTrackerTests.cs @@ -25,8 +25,8 @@ public void Constructor_ExtractsPatchesFromHistory() // Arrange var history = new List { - new() { OrchestratorStarted = new() { Version = new() { Patches = { "patch1" } } } }, - new() { OrchestratorStarted = new() { Version = new() { Patches = { "patch2" } } } } + new() { OrchestratorStarted = new OrchestratorStartedEvent { Version = new OrchestrationVersion { Patches = { "patch1" } } } }, + new() { OrchestratorStarted = new OrchestratorStartedEvent { Version = new OrchestrationVersion { Patches = { "patch2" } } } } }; // Act @@ -42,8 +42,8 @@ public void Constructor_ExtractsPatchesFromHistory_PreservingOrder() // Arrange var history = new List { - new() { OrchestratorStarted = new() { Version = new() { Patches = { "patch1" } } } }, - new() { OrchestratorStarted = new() { Version = new() { Patches = { "patch2" } } } } + new() { OrchestratorStarted = new OrchestratorStartedEvent { Version = new OrchestrationVersion { Patches = { "patch1" } } } }, + new() { OrchestratorStarted = new OrchestratorStartedEvent { Version = new OrchestrationVersion { Patches = { "patch2" } } } } }; // Act @@ -59,7 +59,7 @@ public void Constructor_ExtractsPatchesFromHistory_AllowsDuplicates() // Arrange var history = new List { - new() { OrchestratorStarted = new() { Version = new() { Patches = { "p1", "p1", "p2" } } } } + new() { OrchestratorStarted = new OrchestratorStartedEvent { Version = new OrchestrationVersion { Patches = { "p1", "p1", "p2" } } } } }; // Act @@ -86,7 +86,7 @@ public void RequestPatch_Replay_ReturnsTrueOnlyIfInHistory() { var history = new List { - new() { OrchestratorStarted = new() { Version = new() { Patches = { "patch1" } } } } + new() { OrchestratorStarted = new OrchestratorStartedEvent { Version = new OrchestrationVersion { Patches = { "patch1" } } } } }; var tracker = new WorkflowVersionTracker(history); @@ -115,7 +115,7 @@ public void RequestPatch_Replay_UsesHistoryMembership() { var history = new List { - new() { OrchestratorStarted = new() { Version = new() { Patches = { "p1", "p2" } } } } + new() { OrchestratorStarted = new OrchestratorStartedEvent { Version = new OrchestrationVersion { Patches = { "p1", "p2" } } } } }; var tracker = new WorkflowVersionTracker(history); @@ -130,7 +130,7 @@ public void RequestPatch_Replay_OrderMismatch_DoesNotStall() { var history = new List { - new() { OrchestratorStarted = new() { Version = new() { Patches = { "p1", "p2" } } } } + new() { OrchestratorStarted = new OrchestratorStartedEvent { Version = new OrchestrationVersion { Patches = { "p1", "p2" } } } } }; var tracker = new WorkflowVersionTracker(history); @@ -183,7 +183,7 @@ public void RequestPatch_WhenAlreadyDeterminedFalse_ReturnsFalse() { var history = new List { - new() { OrchestratorStarted = new() { Version = new() { Patches = { "p1" } } } } + new() { OrchestratorStarted = new OrchestratorStartedEvent { Version = new OrchestrationVersion { Patches = { "p1" } } } } }; var tracker = new WorkflowVersionTracker(history); diff --git a/test/Dapr.Workflow.Test/Worker/Grpc/GrpcProtocolHandlerTests.cs b/test/Dapr.Workflow.Test/Worker/Grpc/GrpcProtocolHandlerTests.cs index 286219b03..a4034deaf 100644 --- a/test/Dapr.Workflow.Test/Worker/Grpc/GrpcProtocolHandlerTests.cs +++ b/test/Dapr.Workflow.Test/Worker/Grpc/GrpcProtocolHandlerTests.cs @@ -12,6 +12,7 @@ // ------------------------------------------------------------------------ using System.Diagnostics; +using System.Reflection; using Dapr.DurableTask.Protobuf; using Dapr.Workflow.Worker.Grpc; using Grpc.Core; @@ -22,9 +23,6 @@ namespace Dapr.Workflow.Test.Worker.Grpc; public sealed class GrpcProtocolHandlerTests { - private static TaskCompletionSource CreateTcs() => - new(TaskCreationOptions.RunContinuationsAsynchronously); - private static TaskCompletionSource CreateTcs() => new(TaskCreationOptions.RunContinuationsAsynchronously); private static async Task RunHandlerUntilAsync( @@ -148,7 +146,7 @@ await RunHandlerUntilAsync( timeout: TimeSpan.FromSeconds(2)); var completed = await completedTcs.Task; - Assert.Equal("i-1", completed!.InstanceId); + Assert.Equal("i-1", completed.InstanceId); } [Fact] @@ -702,6 +700,19 @@ await RunHandlerUntilAsync( Assert.True(Volatile.Read(ref getWorkItemsCalls) >= 1); } + + [Fact] + public async Task DelayOrStopAsync_ShouldSwallowCancellation_WhenTokenIsCanceled() + { + var method = typeof(GrpcProtocolHandler).GetMethod("DelayOrStopAsync", BindingFlags.NonPublic | BindingFlags.Static); + Assert.NotNull(method); + + using var cts = new CancellationTokenSource(); + cts.Cancel(); + + var task = (Task)method.Invoke(null, new object?[] { TimeSpan.FromMilliseconds(1), cts.Token })!; + await task; + } private static AsyncServerStreamingCall CreateServerStreamingCallFromReader(IAsyncStreamReader reader) { diff --git a/test/Dapr.Workflow.Test/Worker/WorkflowWorkerTests.cs b/test/Dapr.Workflow.Test/Worker/WorkflowWorkerTests.cs index 7d2481bf9..067ef830d 100644 --- a/test/Dapr.Workflow.Test/Worker/WorkflowWorkerTests.cs +++ b/test/Dapr.Workflow.Test/Worker/WorkflowWorkerTests.cs @@ -21,6 +21,7 @@ using Dapr.Workflow.Worker.Grpc; using Dapr.Workflow.Worker.Internal; using Grpc.Core; +using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; @@ -348,9 +349,9 @@ public async Task ExecuteAsync_ShouldSwallowOperationCanceledException_WhenStopp var grpcClientMock = CreateGrpcClientMock(); grpcClientMock .Setup(x => x.GetWorkItems(It.IsAny(), It.IsAny())) - .Returns((GetWorkItemsRequest _, CallOptions options) => + .Returns((GetWorkItemsRequest _, CallOptions opt) => { - options.CancellationToken.ThrowIfCancellationRequested(); + opt.CancellationToken.ThrowIfCancellationRequested(); return CreateServerStreamingCall(EmptyWorkItems()); }); @@ -367,6 +368,91 @@ public async Task ExecuteAsync_ShouldSwallowOperationCanceledException_WhenStopp await InvokeExecuteAsync(worker, cts.Token); } + + [Fact] + public async Task ExecuteAsync_ShouldRethrow_WhenOptionsHaveInvalidConcurrency() + { + var services = new ServiceCollection().BuildServiceProvider(); + var serializer = new JsonWorkflowSerializer(new JsonSerializerOptions(JsonSerializerDefaults.Web)); + var options = new WorkflowRuntimeOptions(); + + // Bypass property validation to simulate corrupted configuration. + typeof(WorkflowRuntimeOptions) + .GetField("_maxConcurrentWorkflows", BindingFlags.Instance | BindingFlags.NonPublic)! + .SetValue(options, 0); + + var worker = new WorkflowWorker( + CreateGrpcClientMock().Object, + new StubWorkflowsFactory(), + NullLoggerFactory.Instance, + serializer, + services, + options); + + await Assert.ThrowsAsync(() => InvokeExecuteAsync(worker, CancellationToken.None)); + } + + [Fact] + public void CreateCallOptions_ShouldIncludeUserAgentAndApiToken_WhenConfigured() + { + var services = new ServiceCollection().BuildServiceProvider(); + var serializer = new JsonWorkflowSerializer(new JsonSerializerOptions(JsonSerializerDefaults.Web)); + var options = new WorkflowRuntimeOptions(); + + var configuration = new ConfigurationBuilder() + .AddInMemoryCollection(new Dictionary + { + ["DAPR_API_TOKEN"] = "workflow-token" + }) + .Build(); + + var worker = new WorkflowWorker( + CreateGrpcClientMock().Object, + new StubWorkflowsFactory(), + NullLoggerFactory.Instance, + serializer, + services, + options, + configuration); + + using var cts = new CancellationTokenSource(); + var callOptions = InvokeCreateCallOptions(worker, cts.Token); + + Assert.Equal(cts.Token, callOptions.CancellationToken); + Assert.True(HasHeader(callOptions, "User-Agent", out var userAgent)); + Assert.Contains("dapr-sdk-dotnet", userAgent, StringComparison.OrdinalIgnoreCase); + Assert.True(HasHeader(callOptions, "dapr-api-token", out var tokenValue)); + Assert.Equal("workflow-token", tokenValue); + } + + [Fact] + public void CreateCallOptions_ShouldNotIncludeApiTokenHeader_WhenTokenIsEmpty() + { + var services = new ServiceCollection().BuildServiceProvider(); + var serializer = new JsonWorkflowSerializer(new JsonSerializerOptions(JsonSerializerDefaults.Web)); + var options = new WorkflowRuntimeOptions(); + + var configuration = new ConfigurationBuilder() + .AddInMemoryCollection(new Dictionary + { + ["DAPR_API_TOKEN"] = "" + }) + .Build(); + + var worker = new WorkflowWorker( + CreateGrpcClientMock().Object, + new StubWorkflowsFactory(), + NullLoggerFactory.Instance, + serializer, + services, + options, + configuration); + + var callOptions = InvokeCreateCallOptions(worker, CancellationToken.None); + + Assert.False(HasHeader(callOptions, "dapr-api-token", out _)); + Assert.True(HasHeader(callOptions, "User-Agent", out _)); + } [Fact] public async Task CallChildWorkflowAsync_ShouldComplete_WhenCompletionEventArrivesLater() @@ -378,7 +464,7 @@ public async Task CallChildWorkflowAsync_ShouldComplete_WhenCompletionEventArriv instanceId: "parent", currentUtcDateTime: new DateTime(2025, 01, 01, 0, 0, 0, DateTimeKind.Utc), workflowSerializer: serializer, - loggerFactory: NullLoggerFactory.Instance, new WorkflowVersionTracker([]), null); + loggerFactory: NullLoggerFactory.Instance, new WorkflowVersionTracker([])); var task = context.CallChildWorkflowAsync("ChildWf"); @@ -442,7 +528,7 @@ public async Task CallChildWorkflowAsync_ShouldComplete_WhenCompletionArrivedBef var serializer = new JsonWorkflowSerializer(new JsonSerializerOptions(JsonSerializerDefaults.Web)); var context = new WorkflowOrchestrationContext( "wf", "parent", new DateTime(2025, 01, 01, 0, 0, 0, DateTimeKind.Utc), - serializer, NullLoggerFactory.Instance, new WorkflowVersionTracker([]), null); + serializer, NullLoggerFactory.Instance, new WorkflowVersionTracker([])); var completionEvent = new[] { @@ -482,7 +568,7 @@ public async Task CallChildWorkflowAsync_ShouldIgnoreDuplicateCompletionEvents() var serializer = new JsonWorkflowSerializer(new JsonSerializerOptions(JsonSerializerDefaults.Web)); var context = new WorkflowOrchestrationContext( "wf", "parent", new DateTime(2025, 01, 01, 0, 0, 0, DateTimeKind.Utc), - serializer, NullLoggerFactory.Instance, new WorkflowVersionTracker([]), null); + serializer, NullLoggerFactory.Instance, new WorkflowVersionTracker([])); var task = context.CallChildWorkflowAsync("ChildWf"); @@ -601,7 +687,7 @@ public async Task CallChildWorkflowAsync_ShouldOnlyCompleteAfterCreation_WhenCom var serializer = new JsonWorkflowSerializer(new JsonSerializerOptions(JsonSerializerDefaults.Web)); var context = new WorkflowOrchestrationContext( "wf", "parent", new DateTime(2025, 01, 01, 0, 0, 0, DateTimeKind.Utc), - serializer, NullLoggerFactory.Instance, new WorkflowVersionTracker([]), null); + serializer, NullLoggerFactory.Instance, new WorkflowVersionTracker([])); var completionHistory = new[] { @@ -635,7 +721,7 @@ public async Task CallChildWorkflowAsync_ShouldCompleteOnlyForMatchingTaskSchedu var serializer = new JsonWorkflowSerializer(new JsonSerializerOptions(JsonSerializerDefaults.Web)); var context = new WorkflowOrchestrationContext( "wf", "parent", new DateTime(2025, 01, 01, 0, 0, 0, DateTimeKind.Utc), - serializer, NullLoggerFactory.Instance, new WorkflowVersionTracker([]), null); + serializer, NullLoggerFactory.Instance, new WorkflowVersionTracker([])); var task = context.CallChildWorkflowAsync("ChildWf"); @@ -1099,7 +1185,7 @@ public async Task HandleOrchestratorResponseAsync_ShouldAdvanceCurrentUtcDateTim var factory = new StubWorkflowsFactory(); factory.AddWorkflow("wf", new InlineWorkflow( inputType: typeof(int), - run: async (ctx, input) => + run: async (ctx, _) => { Assert.Equal(beginDateTime, ctx.CurrentUtcDateTime); await ctx.CreateTimer(TimeSpan.FromSeconds(5)); @@ -1173,7 +1259,7 @@ public async Task HandleOrchestratorResponseAsync_ShouldCompleted_WhenEventRecei var factory = new StubWorkflowsFactory(); factory.AddWorkflow("wf", new InlineWorkflow( inputType: typeof(int), - run: async (ctx, input) => + run: async (ctx, _) => { await ctx.WaitForExternalEventAsync("MyEvent", TimeSpan.FromSeconds(5)); return null; @@ -1256,7 +1342,7 @@ public async Task HandleOrchestratorResponseAsync_ShouldReturnFailureDetails_Whe var factory = new StubWorkflowsFactory(); factory.AddWorkflow("wf", new InlineWorkflow( inputType: typeof(int), - run: async (ctx, input) => + run: async (ctx, _) => { await ctx.WaitForExternalEventAsync("MyEvent", TimeSpan.FromSeconds(5)); return null; @@ -1338,7 +1424,7 @@ public async Task HandleActivityResponseAsync_ShouldUseEmptyInstanceId_WhenOrche var factory = new StubWorkflowsFactory(); factory.AddActivity("act", new InlineActivity( inputType: typeof(int), - run: (_, __) => Task.FromResult(null))); // null output -> empty string result + run: (_, _) => Task.FromResult(null))); // null output -> empty string result var worker = new WorkflowWorker( CreateGrpcClientMock().Object, @@ -1371,16 +1457,30 @@ private static async Task InvokeExecuteAsync(WorkflowWorker worker, Cancellation var method = typeof(WorkflowWorker).GetMethod("ExecuteAsync", BindingFlags.Instance | BindingFlags.NonPublic); Assert.NotNull(method); - var task = (Task)method!.Invoke(worker, [token])!; + var task = (Task)method.Invoke(worker, [token])!; await task; } + private static CallOptions InvokeCreateCallOptions(WorkflowWorker worker, CancellationToken token) + { + var method = typeof(WorkflowWorker).GetMethod("CreateCallOptions", BindingFlags.Instance | BindingFlags.NonPublic); + Assert.NotNull(method); + return (CallOptions)method.Invoke(worker, [token])!; + } + + private static bool HasHeader(CallOptions options, string key, out string? value) + { + var entry = options.Headers?.FirstOrDefault(header => string.Equals(header.Key, key, StringComparison.OrdinalIgnoreCase)); + value = entry?.Value; + return entry is not null; + } + private static async Task InvokeHandleOrchestratorResponseAsync(WorkflowWorker worker, OrchestratorRequest request) { var method = typeof(WorkflowWorker).GetMethod("HandleOrchestratorResponseAsync", BindingFlags.Instance | BindingFlags.NonPublic); Assert.NotNull(method); - var task = (Task)method!.Invoke(worker, [request, CompletionTokenValue])!; + var task = (Task)method.Invoke(worker, [request, CompletionTokenValue])!; return await task; } @@ -1389,7 +1489,7 @@ private static async Task InvokeHandleActivityResponseAsync(Wo var method = typeof(WorkflowWorker).GetMethod("HandleActivityResponseAsync", BindingFlags.Instance | BindingFlags.NonPublic); Assert.NotNull(method); - var task = (Task)method!.Invoke(worker, [request, CompletionTokenValue])!; + var task = (Task)method.Invoke(worker, [request, CompletionTokenValue])!; return await task; } diff --git a/test/Dapr.Workflow.Test/Worker/WorkflowsFactoryTests.cs b/test/Dapr.Workflow.Test/Worker/WorkflowsFactoryTests.cs index a68af8b42..9448362d7 100644 --- a/test/Dapr.Workflow.Test/Worker/WorkflowsFactoryTests.cs +++ b/test/Dapr.Workflow.Test/Worker/WorkflowsFactoryTests.cs @@ -39,7 +39,7 @@ public void RegisterWorkflow_Generic_ShouldDefaultNameToTypeName_AndCreateViaDI( Assert.True(created); Assert.NotNull(workflow); Assert.IsType(workflow); - Assert.Equal("dep-1", ((TestWorkflowWithDependency)workflow!).Dep.Value); + Assert.Equal("dep-1", ((TestWorkflowWithDependency)workflow).Dep.Value); } [Fact] @@ -59,7 +59,7 @@ public void RegisterActivity_Generic_ShouldDefaultNameToTypeName_AndCreateViaDI( Assert.True(created); Assert.NotNull(activity); Assert.IsType(activity); - Assert.Equal("dep-2", ((TestActivityWithDependency)activity!).Dep.Value); + Assert.Equal("dep-2", ((TestActivityWithDependency)activity).Dep.Value); } [Fact] @@ -184,7 +184,7 @@ public async Task RegisterWorkflow_Function_ShouldNotOverwrite_WhenRegisteringSa Assert.True(factory.TryCreateWorkflow(new TaskIdentifier("wf"), sp, out var workflow)); Assert.NotNull(workflow); - var result = await workflow!.RunAsync(new FakeWorkflowContext(), 10); + var result = await workflow.RunAsync(new FakeWorkflowContext(), 10); Assert.Equal(11, result); } @@ -202,7 +202,7 @@ public async Task RegisterActivity_Function_ShouldNotOverwrite_WhenRegisteringSa Assert.True(factory.TryCreateActivity(new TaskIdentifier("act"), sp, out var activity)); Assert.NotNull(activity); - var result = await activity!.RunAsync(new FakeActivityContext(), 10); + var result = await activity.RunAsync(new FakeActivityContext(), 10); Assert.Equal(11, result); } @@ -299,7 +299,7 @@ public async Task RegisteredFunctionWorkflow_ShouldInvokeImplementation_AndUseTy Assert.True(created); Assert.NotNull(workflow); - Assert.Equal(typeof(int), workflow!.InputType); + Assert.Equal(typeof(int), workflow.InputType); Assert.Equal(typeof(string), workflow.OutputType); var result = await workflow.RunAsync(new FakeWorkflowContext(), 7); @@ -320,7 +320,7 @@ public async Task RegisteredFunctionActivity_ShouldInvokeImplementation_AndUseTy Assert.True(created); Assert.NotNull(activity); - Assert.Equal(typeof(int), activity!.InputType); + Assert.Equal(typeof(int), activity.InputType); Assert.Equal(typeof(string), activity.OutputType); var result = await activity.RunAsync(new FakeActivityContext(), 7); diff --git a/test/Dapr.Workflow.Test/WorkflowServiceCollectionExtensionsTests.cs b/test/Dapr.Workflow.Test/WorkflowServiceCollectionExtensionsTests.cs index 933ace1b6..5b11e0325 100644 --- a/test/Dapr.Workflow.Test/WorkflowServiceCollectionExtensionsTests.cs +++ b/test/Dapr.Workflow.Test/WorkflowServiceCollectionExtensionsTests.cs @@ -1,5 +1,4 @@ using System.Text.Json; -using System.Collections.Generic; using Dapr.DurableTask.Protobuf; using Dapr.Workflow.Abstractions; using Dapr.Workflow.Client; @@ -113,6 +112,24 @@ public void AddDaprWorkflowBuilder_WithJsonSerializer_ShouldReplaceDefaultSerial Assert.IsType(serializer); } + [Fact] + public void AddDaprWorkflowBuilder_WithSerializerInstance_ShouldReplaceDefaultSerializer() + { + var services = new ServiceCollection(); + services.AddLogging(); + + var serializer = new MockSerializer(); + + services + .AddDaprWorkflowBuilder(_ => { }) + .WithSerializer(serializer); + + var sp = services.BuildServiceProvider(); + var resolved = sp.GetRequiredService(); + + Assert.Same(serializer, resolved); + } + [Fact] public void AddDaprWorkflowBuilder_WithSerializerFactory_ShouldResolveDependenciesFromServiceProvider() { @@ -158,8 +175,7 @@ public void AddDaprWorkflow_ShouldApplyGrpcChannelOptionsIntoGrpcClientFactoryOp var clientType = typeof(TaskHubSidecarService.TaskHubSidecarServiceClient); var grpcOptions = - monitor.Get(clientType.FullName!) - ?? monitor.Get(clientType.Name); + monitor.Get(clientType.FullName!); if (grpcOptions.ChannelOptionsActions.Count == 0) { @@ -198,13 +214,13 @@ public async Task AddDaprWorkflow_ShouldCreateWorkflowsFactory_AndApplyRegistrat Assert.True(factory.TryCreateWorkflow(new TaskIdentifier("wf"), sp, out var wf)); Assert.NotNull(wf); - var wfResult = await wf!.RunAsync(new FakeWorkflowContext(), 10); + var wfResult = await wf.RunAsync(new FakeWorkflowContext(), 10); Assert.Equal(11, wfResult); Assert.True(factory.TryCreateActivity(new TaskIdentifier("act"), sp, out var act)); Assert.NotNull(act); - var actResult = await act!.RunAsync(new FakeActivityContext(), 10); + var actResult = await act.RunAsync(new FakeActivityContext(), 10); Assert.Equal(12, actResult); } @@ -263,8 +279,7 @@ public void AddDaprWorkflowClient_WithGrpcMessageSizeLimits_ShouldApplyIntoGrpcC var clientType = typeof(TaskHubSidecarService.TaskHubSidecarServiceClient); var grpcOptions = - monitor.Get(clientType.FullName!) - ?? monitor.Get(clientType.Name); + monitor.Get(clientType.FullName!); if (grpcOptions.ChannelOptionsActions.Count == 0) { @@ -284,6 +299,77 @@ public void AddDaprWorkflowClient_WithGrpcMessageSizeLimits_ShouldApplyIntoGrpcC Assert.Equal(8765, channelOptions.MaxSendMessageSize); } + [Fact] + public void WithGrpcMessageSizeLimits_ShouldNotModifyOptions_WhenNoValuesProvided() + { + var baselineServices = new ServiceCollection(); + baselineServices.AddLogging(); + baselineServices.AddDaprWorkflowClient(); + + using var baselineProvider = baselineServices.BuildServiceProvider(); + var baselineCount = GetGrpcChannelOptionsActionCount(baselineProvider); + + var services = new ServiceCollection(); + services.AddLogging(); + + services + .AddDaprWorkflowClient() + .WithGrpcMessageSizeLimits(); + + using var provider = services.BuildServiceProvider(); + var count = GetGrpcChannelOptionsActionCount(provider); + + Assert.Equal(baselineCount, count); + } + + [Fact] + public void WithGrpcMessageSizeLimits_ShouldApplyOnlyReceiveLimit_WhenSendIsNull() + { + var baselineServices = new ServiceCollection(); + baselineServices.AddLogging(); + baselineServices.AddDaprWorkflowClient(); + + using var baselineProvider = baselineServices.BuildServiceProvider(); + var baseline = ApplyGrpcOptions(baselineProvider); + + var services = new ServiceCollection(); + services.AddLogging(); + + services + .AddDaprWorkflowClient() + .WithGrpcMessageSizeLimits(maxReceiveMessageSize: 1111); + + using var provider = services.BuildServiceProvider(); + var channelOptions = ApplyGrpcOptions(provider); + + Assert.Equal(1111, channelOptions.MaxReceiveMessageSize); + Assert.Equal(baseline.MaxSendMessageSize, channelOptions.MaxSendMessageSize); + } + + [Fact] + public void WithGrpcMessageSizeLimits_ShouldApplyOnlySendLimit_WhenReceiveIsNull() + { + var baselineServices = new ServiceCollection(); + baselineServices.AddLogging(); + baselineServices.AddDaprWorkflowClient(); + + using var baselineProvider = baselineServices.BuildServiceProvider(); + var baseline = ApplyGrpcOptions(baselineProvider); + + var services = new ServiceCollection(); + services.AddLogging(); + + services + .AddDaprWorkflowClient() + .WithGrpcMessageSizeLimits(maxSendMessageSize: 2222); + + using var provider = services.BuildServiceProvider(); + var channelOptions = ApplyGrpcOptions(provider); + + Assert.Equal(baseline.MaxReceiveMessageSize, channelOptions.MaxReceiveMessageSize); + Assert.Equal(2222, channelOptions.MaxSendMessageSize); + } + [Theory] [InlineData(0, 1024)] [InlineData(1024, 0)] @@ -312,7 +398,7 @@ public void AddDaprWorkflow_ShouldRegisterDaprWorkflowClient_WithConfiguredLifet var descriptor = services.SingleOrDefault(d => d.ServiceType == typeof(DaprWorkflowClient)); Assert.NotNull(descriptor); - Assert.Equal(lifetime, descriptor!.Lifetime); + Assert.Equal(lifetime, descriptor.Lifetime); } [Fact] @@ -458,4 +544,41 @@ private sealed class MockSerializer : IWorkflowSerializer public T? Deserialize(string? data) => default; public object? Deserialize(string? data, Type returnType) => null; } + + private static int GetGrpcChannelOptionsActionCount(ServiceProvider provider) + => GetGrpcOptionsCandidates(provider).Sum(options => options.ChannelOptionsActions.Count); + + private static GrpcChannelOptions ApplyGrpcOptions(ServiceProvider provider) + { + var channelOptions = new GrpcChannelOptions(); + + foreach (var grpcOptions in GetGrpcOptionsCandidates(provider)) + { + foreach (var action in grpcOptions.ChannelOptionsActions) + { + action(channelOptions); + } + } + + return channelOptions; + } + + private static IEnumerable GetGrpcOptionsCandidates(ServiceProvider provider) + { + var monitor = provider.GetRequiredService>(); + var clientType = typeof(TaskHubSidecarService.TaskHubSidecarServiceClient); + + var fullNameOptions = monitor.Get(clientType.FullName!); + var nameOptions = monitor.Get(clientType.Name); + + if (fullNameOptions is not null) + { + yield return fullNameOptions; + } + + if (nameOptions is not null && !ReferenceEquals(nameOptions, fullNameOptions)) + { + yield return nameOptions; + } + } }