Skip to content
16 changes: 15 additions & 1 deletion src/Dapr.Testcontainers/Common/Options/DaprRuntimeOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// ------------------------------------------------------------------------

using System;
using Dapr.Testcontainers.Common;

namespace Dapr.Testcontainers.Common.Options;

Expand Down Expand Up @@ -64,6 +63,11 @@ public DaprRuntimeOptions(string version = "latest")
/// The ID of the test application.
/// </summary>
public string AppId { get; private set; } = $"test-app-{Guid.NewGuid():N}";

/// <summary>
/// The Dapr API token used to secure communications with the sidecar.
/// </summary>
public string? DaprApiToken { get; private set; }

/// <summary>
/// The level of Dapr logs to show.
Expand Down Expand Up @@ -119,6 +123,16 @@ public DaprRuntimeOptions WithAppId(string appId)
return this;
}

/// <summary>
/// Sets the Dapr API token used to secure communications with the sidecar.
/// </summary>
/// <param name="daprApiToken">The API token to use.</param>
public DaprRuntimeOptions WithDaprApiToken(string daprApiToken)
{
DaprApiToken = daprApiToken;
return this;
}

/// <summary>
/// Enables container log capture to files.
/// </summary>
Expand Down
6 changes: 6 additions & 0 deletions src/Dapr.Testcontainers/Containers/Dapr/DaprdContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,12 @@ public DaprdContainer(
containerBuilder = containerBuilder.WithOutputConsumer(_logAttachment.OutputConsumer);
}

// Put the API token in an envvar so it can be picked up by the Dapr runtime at startup
if (!string.IsNullOrWhiteSpace(options.DaprApiToken))
{
containerBuilder = containerBuilder.WithEnvironment("DAPR_API_TOKEN", options.DaprApiToken);
}

containerBuilder = daprHttpPort is not null ? containerBuilder.WithPortBinding(containerPort: InternalHttpPort, hostPort: daprHttpPort.Value) : containerBuilder.WithPortBinding(port: InternalHttpPort, assignRandomHostPort: true);
containerBuilder = daprGrpcPort is not null ? containerBuilder.WithPortBinding(containerPort: InternalGrpcPort, hostPort: daprGrpcPort.Value) : containerBuilder.WithPortBinding(port: InternalGrpcPort, assignRandomHostPort: true);

Expand Down
32 changes: 23 additions & 9 deletions src/Dapr.Workflow/Client/WorkflowGrpcClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
// ------------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Dapr.Common;
using Dapr.Workflow.Serialization;
using Grpc.Core;
using grpc = Dapr.DurableTask.Protobuf;
Expand All @@ -25,7 +25,11 @@ namespace Dapr.Workflow.Client;
/// <summary>
/// The gRPC-based implementation of the Workflow client.
/// </summary>
internal sealed class WorkflowGrpcClient(grpc.TaskHubSidecarService.TaskHubSidecarServiceClient grpcClient, ILogger<WorkflowGrpcClient> logger, IWorkflowSerializer serializer) : WorkflowClient
internal sealed class WorkflowGrpcClient(
grpc.TaskHubSidecarService.TaskHubSidecarServiceClient grpcClient,
ILogger<WorkflowGrpcClient> logger,
IWorkflowSerializer serializer,
string? daprApiToken = null) : WorkflowClient
{
/// <inheritdoc />
public override async Task<string> ScheduleNewWorkflowAsync(string workflowName, object? input = null, StartWorkflowOptions? options = null,
Expand All @@ -46,7 +50,8 @@ public override async Task<string> ScheduleNewWorkflowAsync(string workflowName,
request.ScheduledStartTimestamp = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTimeOffset(startAt);
}

var response = await grpcClient.StartInstanceAsync(request, cancellationToken: cancellationToken);
var grpcCallOptions = CreateCallOptions(cancellationToken);
var response = await grpcClient.StartInstanceAsync(request, grpcCallOptions);
logger.LogScheduleWorkflowSuccess(workflowName, instanceId);
return response.InstanceId;
}
Expand All @@ -63,7 +68,8 @@ public override async Task<string> ScheduleNewWorkflowAsync(string workflowName,

try
{
var response = await grpcClient.GetInstanceAsync(request, cancellationToken: cancellationToken);
var grpcCallOptions = CreateCallOptions(cancellationToken);
var response = await grpcClient.GetInstanceAsync(request, grpcCallOptions);

if (!response.Exists)
{
Expand Down Expand Up @@ -142,7 +148,8 @@ public override async Task RaiseEventAsync(string instanceId, string eventName,
InstanceId = instanceId, Name = eventName, Input = SerializeToJson(eventPayload)
};

await grpcClient.RaiseEventAsync(request, cancellationToken: cancellationToken);
var grpcCallOptions = CreateCallOptions(cancellationToken);
await grpcClient.RaiseEventAsync(request, grpcCallOptions);
logger.LogRaisedEvent(eventName, instanceId);
}

Expand All @@ -157,7 +164,8 @@ public override async Task TerminateWorkflowAsync(string instanceId, object? out
Recursive = true // Terminate child workflows too
};

await grpcClient.TerminateInstanceAsync(request, cancellationToken: cancellationToken);
var grpcCallOptions = CreateCallOptions(cancellationToken);
await grpcClient.TerminateInstanceAsync(request, grpcCallOptions);
logger.LogTerminateWorkflow(instanceId);
}

Expand All @@ -168,7 +176,8 @@ public override async Task SuspendWorkflowAsync(string instanceId, string? reaso

var request = new grpc.SuspendRequest { InstanceId = instanceId, Reason = reason ?? string.Empty };

await grpcClient.SuspendInstanceAsync(request, cancellationToken: cancellationToken);
var grpcCallOptions = CreateCallOptions(cancellationToken);
await grpcClient.SuspendInstanceAsync(request, grpcCallOptions);
logger.LogSuspendWorkflow(instanceId);
}

Expand All @@ -183,7 +192,8 @@ public override async Task ResumeWorkflowAsync(string instanceId, string? reason
Reason = reason ?? string.Empty
};

await grpcClient.ResumeInstanceAsync(request, cancellationToken: cancellationToken);
var grpcCallOptions = CreateCallOptions(cancellationToken);
await grpcClient.ResumeInstanceAsync(request, grpcCallOptions);
logger.LogResumedWorkflow(instanceId);
}

Expand All @@ -198,7 +208,8 @@ public override async Task<bool> PurgeInstanceAsync(string instanceId, Cancellat
Recursive = true // Purge child workflows too
};

var response = await grpcClient.PurgeInstancesAsync(request, cancellationToken: cancellationToken);
var grpcCallOptions = CreateCallOptions(cancellationToken);
var response = await grpcClient.PurgeInstancesAsync(request, grpcCallOptions);
var purged = response.DeletedInstanceCount > 0;

if (purged)
Expand All @@ -222,6 +233,9 @@ public override ValueTask DisposeAsync()

private string SerializeToJson(object? obj) => obj == null ? string.Empty : serializer.Serialize(obj);

private CallOptions CreateCallOptions(CancellationToken cancellationToken) =>
DaprClientUtilities.ConfigureGrpcCallOptions(typeof(DaprWorkflowClient).Assembly, daprApiToken, cancellationToken);

private static bool IsTerminalStatus(WorkflowRuntimeStatus status) =>
status is WorkflowRuntimeStatus.Completed
or WorkflowRuntimeStatus.Failed
Expand Down
1 change: 0 additions & 1 deletion src/Dapr.Workflow/Client/WorkflowMetadata.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// ------------------------------------------------------------------------

using System;
using System.Text.Json;
using Dapr.Workflow.Serialization;

namespace Dapr.Workflow.Client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ public override DaprWorkflowClient Build()
logger = loggerFactory.CreateLogger<WorkflowGrpcClient>();
}

var innerClient = new WorkflowGrpcClient(grpcClient, logger, serializer);
var innerClient = new WorkflowGrpcClient(grpcClient, logger, serializer, DaprApiToken);

return new DaprWorkflowClient(innerClient, DaprApiToken);
}
}
26 changes: 20 additions & 6 deletions src/Dapr.Workflow/Worker/Grpc/GrpcProtocolHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Dapr.Common;
using Dapr.DurableTask.Protobuf;
using Grpc.Core;
using Microsoft.Extensions.Logging;
Expand All @@ -24,7 +25,12 @@ namespace Dapr.Workflow.Worker.Grpc;
/// <summary>
/// Handles the bidirectional gRPC streaming protocol with the Dapr sidecar.
/// </summary>
internal sealed class GrpcProtocolHandler(TaskHubSidecarService.TaskHubSidecarServiceClient grpcClient, ILoggerFactory loggerFactory, int maxConcurrentWorkItems = 100, int maxConcurrentActivities = 100) : IAsyncDisposable
internal sealed class GrpcProtocolHandler(
TaskHubSidecarService.TaskHubSidecarServiceClient grpcClient,
ILoggerFactory loggerFactory,
int maxConcurrentWorkItems = 100,
int maxConcurrentActivities = 100,
string? daprApiToken = null) : IAsyncDisposable
{
private static readonly TimeSpan ReconnectDelay = TimeSpan.FromSeconds(5);

Expand Down Expand Up @@ -66,7 +72,8 @@ public async Task StartAsync(
_logger.LogGrpcProtocolHandlerStartStream();

// Establish the server streaming call
_streamingCall = _grpcClient.GetWorkItems(request, cancellationToken: token);
var grpcCallOptions = CreateCallOptions(token);
_streamingCall = _grpcClient.GetWorkItems(request, grpcCallOptions);

_logger.LogGrpcProtocolHandlerStreamEstablished();

Expand Down Expand Up @@ -202,7 +209,8 @@ private async Task ProcessWorkflowAsync(OrchestratorRequest request, string comp
var result = await handler(request, completionToken);

// Send the result back to Dapr
await _grpcClient.CompleteOrchestratorTaskAsync(result, cancellationToken: cancellationToken);
var grpcCallOptions = CreateCallOptions(cancellationToken);
await _grpcClient.CompleteOrchestratorTaskAsync(result, grpcCallOptions);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
Expand All @@ -213,7 +221,8 @@ private async Task ProcessWorkflowAsync(OrchestratorRequest request, string comp
try
{
var failureResult = CreateWorkflowFailureResult(request, completionToken, ex);
await _grpcClient.CompleteOrchestratorTaskAsync(failureResult, cancellationToken: cancellationToken);
var grpcCallOptions = CreateCallOptions(cancellationToken);
await _grpcClient.CompleteOrchestratorTaskAsync(failureResult, grpcCallOptions);
}
catch (Exception resultEx)
{
Expand Down Expand Up @@ -241,7 +250,8 @@ private async Task ProcessActivityAsync(ActivityRequest request, string completi
var result = await handler(request, completionToken);

// Send the result back to Dapr
await _grpcClient.CompleteActivityTaskAsync(result, cancellationToken: cancellationToken);
var grpcCallOptions = CreateCallOptions(cancellationToken);
await _grpcClient.CompleteActivityTaskAsync(result, grpcCallOptions);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
Expand All @@ -255,7 +265,8 @@ private async Task ProcessActivityAsync(ActivityRequest request, string completi
try
{
var failureResult = CreateActivityFailureResult(request, completionToken, ex);
await _grpcClient.CompleteActivityTaskAsync(failureResult, cancellationToken: cancellationToken);
var grpcCallOptions = CreateCallOptions(cancellationToken);
await _grpcClient.CompleteActivityTaskAsync(failureResult, grpcCallOptions);
}
catch (Exception resultEx)
{
Expand Down Expand Up @@ -325,4 +336,7 @@ public async ValueTask DisposeAsync()

_logger.LogGrpcProtocolHandlerDisposed();
}

private CallOptions CreateCallOptions(CancellationToken cancellationToken) =>
DaprClientUtilities.ConfigureGrpcCallOptions(typeof(GrpcProtocolHandler).Assembly, daprApiToken, cancellationToken);
}
14 changes: 11 additions & 3 deletions src/Dapr.Workflow/Worker/WorkflowWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Dapr.Common;
using Dapr.DurableTask.Protobuf;
using Dapr.Workflow.Abstractions;
using Dapr.Workflow.Serialization;
using Dapr.Workflow.Versioning;
using Dapr.Workflow.Worker.Grpc;
using Dapr.Workflow.Worker.Internal;
using Grpc.Core;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
Expand All @@ -36,14 +39,16 @@ internal sealed class WorkflowWorker(
ILoggerFactory loggerFactory,
IWorkflowSerializer workflowSerializer,
IServiceProvider serviceProvider,
WorkflowRuntimeOptions options) : BackgroundService
WorkflowRuntimeOptions options,
IConfiguration? configuration = null) : BackgroundService
{
private readonly TaskHubSidecarService.TaskHubSidecarServiceClient _grpcClient = grpcClient ?? throw new ArgumentNullException(nameof(grpcClient));
private readonly IWorkflowsFactory _workflowsFactory = workflowsFactory ?? throw new ArgumentNullException(nameof(workflowsFactory));
private readonly IServiceProvider _serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
private readonly ILogger<WorkflowWorker> _logger = loggerFactory.CreateLogger<WorkflowWorker>() ?? throw new ArgumentNullException(nameof(loggerFactory));
private readonly WorkflowRuntimeOptions _options = options ?? throw new ArgumentNullException(nameof(options));
private readonly IWorkflowSerializer _serializer = workflowSerializer ?? throw new ArgumentNullException(nameof(workflowSerializer));
private readonly string? _daprApiToken = DaprDefaults.GetDefaultDaprApiToken(configuration);

private GrpcProtocolHandler? _protocolHandler;

Expand All @@ -57,7 +62,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
try
{
// Create the protocol handler
_protocolHandler = new GrpcProtocolHandler(_grpcClient, loggerFactory, _options.MaxConcurrentWorkflows, _options.MaxConcurrentActivities);
_protocolHandler = new GrpcProtocolHandler(_grpcClient, loggerFactory, _options.MaxConcurrentWorkflows, _options.MaxConcurrentActivities, _daprApiToken);

// Start processing work items
await _protocolHandler.StartAsync(HandleOrchestratorResponseAsync, HandleActivityResponseAsync, stoppingToken);
Expand Down Expand Up @@ -100,7 +105,7 @@ private async Task<OrchestratorResponse> HandleOrchestratorResponseAsync(Orchest
ForWorkItemProcessing = true
};

using var call = _grpcClient.StreamInstanceHistory(streamRequest);
using var call = _grpcClient.StreamInstanceHistory(streamRequest, CreateCallOptions(CancellationToken.None));
while (await call.ResponseStream.MoveNext(CancellationToken.None).ConfigureAwait(false))
{
var chunk = call.ResponseStream.Current.Events;
Expand Down Expand Up @@ -545,4 +550,7 @@ public override async Task StopAsync(CancellationToken cancellationToken)

await base.StopAsync(cancellationToken);
}

private CallOptions CreateCallOptions(CancellationToken cancellationToken) =>
DaprClientUtilities.ConfigureGrpcCallOptions(typeof(WorkflowWorker).Assembly, _daprApiToken, cancellationToken);
}
4 changes: 3 additions & 1 deletion src/Dapr.Workflow/WorkflowServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,12 @@ private static void AddDaprWorkflowCore(
serviceCollection.TryAddSingleton<WorkflowClient>(sp =>
{
var grpcClient = sp.GetRequiredService<TaskHubSidecarService.TaskHubSidecarServiceClient>();
var configuration = sp.GetService<IConfiguration>();
var loggerFactory = sp.GetRequiredService<ILoggerFactory>();
var logger = loggerFactory.CreateLogger<WorkflowGrpcClient>();
var serializer = sp.GetRequiredService<IWorkflowSerializer>();
return new WorkflowGrpcClient(grpcClient, logger, serializer);
var daprApiToken = DaprDefaults.GetDefaultDaprApiToken(configuration);
return new WorkflowGrpcClient(grpcClient, logger, serializer, daprApiToken);
});

// Register gRPC client for communicating with Dapr sidecar
Expand Down
Loading
Loading