diff --git a/Directory.Packages.props b/Directory.Packages.props index 245f7aeb..319feb20 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -5,20 +5,22 @@ - + - + - + + - - + + + @@ -27,12 +29,9 @@ - - + - - - + @@ -40,12 +39,24 @@ - - - + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/Elsa.Integrations.sln b/Elsa.Integrations.sln index 14ce290c..4672afcc 100644 --- a/Elsa.Integrations.sln +++ b/Elsa.Integrations.sln @@ -32,6 +32,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Elsa.Integrations.Telnyx", EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Elsa.Integrations.OrchardCore", "src\Elsa.Integrations.OrchardCore\Elsa.Integrations.OrchardCore.csproj", "{71D93DC7-A455-4EDC-86DB-826CCEECEEF8}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Elsa.Integrations.AzureServiceBus", "src\Elsa.Integrations.AzureServiceBus\Elsa.Integrations.AzureServiceBus.csproj", "{BC283CFE-D542-4D40-AE7F-6888B876AA2B}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -58,6 +60,10 @@ Global {71D93DC7-A455-4EDC-86DB-826CCEECEEF8}.Debug|Any CPU.Build.0 = Debug|Any CPU {71D93DC7-A455-4EDC-86DB-826CCEECEEF8}.Release|Any CPU.ActiveCfg = Release|Any CPU {71D93DC7-A455-4EDC-86DB-826CCEECEEF8}.Release|Any CPU.Build.0 = Release|Any CPU + {BC283CFE-D542-4D40-AE7F-6888B876AA2B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {BC283CFE-D542-4D40-AE7F-6888B876AA2B}.Debug|Any CPU.Build.0 = Debug|Any CPU + {BC283CFE-D542-4D40-AE7F-6888B876AA2B}.Release|Any CPU.ActiveCfg = Release|Any CPU + {BC283CFE-D542-4D40-AE7F-6888B876AA2B}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -68,5 +74,6 @@ Global {861E1230-F9CB-450C-845E-04DFDA259E26} = {A99FA26E-2098-403A-BD04-6BBCFBE3AC7D} {128B2FC3-81A7-4327-9665-9155B05F21DA} = {527248D6-B851-4C8D-8667-E2FB0A91DABF} {71D93DC7-A455-4EDC-86DB-826CCEECEEF8} = {527248D6-B851-4C8D-8667-E2FB0A91DABF} + {BC283CFE-D542-4D40-AE7F-6888B876AA2B} = {527248D6-B851-4C8D-8667-E2FB0A91DABF} EndGlobalSection EndGlobal diff --git a/src/Elsa.Integrations.AzureServiceBus/Activities/MessageReceived.cs b/src/Elsa.Integrations.AzureServiceBus/Activities/MessageReceived.cs new file mode 100644 index 00000000..ab028ae3 --- /dev/null +++ b/src/Elsa.Integrations.AzureServiceBus/Activities/MessageReceived.cs @@ -0,0 +1,126 @@ +using System.Runtime.CompilerServices; +using Elsa.Common; +using Elsa.Expressions.Models; +using Elsa.Extensions; +using Elsa.Integrations.AzureServiceBus.Models; +using Elsa.Workflows; +using Elsa.Workflows.Attributes; +using Elsa.Workflows.Models; + +namespace Elsa.Integrations.AzureServiceBus.Activities; + +/// +/// Triggered when a message is received on a specified queue or topic and subscription. +/// +[Activity("Elsa.AzureServiceBus", "Azure Service Bus", "Executes when a message is received from the configured queue or topic and subscription")] +public class MessageReceived : Trigger +{ + internal const string InputKey = "TransportMessage"; + + /// + public MessageReceived([CallerFilePath] string? source = null, [CallerLineNumber] int? line = null) : base(source, line) + { + } + + /// + public MessageReceived(Input queue) + { + QueueOrTopic = queue; + } + + /// + public MessageReceived(string queue) : this(new Input(queue)) + { + } + + /// + public MessageReceived(Input topic, Input subscription) + { + QueueOrTopic = topic; + Subscription = subscription; + } + + /// + public MessageReceived(string topic, string subscription) : this(new Input(topic), new Input(subscription)) + { + } + + /// + /// The name of the queue or topic to read from. + /// + [Input(Description = "The name of the queue or topic to read from.")] + public Input QueueOrTopic { get; set; } = null!; + + /// + /// The name of the subscription to read from. + /// + [Input(Description = "The name of the subscription to read from.")] + public Input? Subscription { get; set; } + + /// + /// The .NET type to deserialize the message into. Defaults to . + /// + [Input(Description = "The .NET type to deserialize the message into.")] + public Input MessageType { get; set; } = new(typeof(string)); + + /// + /// The received transport message. + /// + [Output(Description = "The received transport message.")] + public Output TransportMessage { get; set; } = null!; + + /// + /// The received transport message. + /// + [Output(Description = "The received message.")] + public Output Message { get; set; } = null!; + + /// + /// The formatter to use to parse the message. + /// + [Input(Description = "The formatter to use to serialize the message.")] + public Input Formatter { get; set; } = null!; + + /// + protected override object GetTriggerPayload(TriggerIndexingContext context) => GetStimulus(context.ExpressionExecutionContext); + + /// + protected override async ValueTask ExecuteAsync(ActivityExecutionContext context) + { + // If we did not receive external input, it means we are just now encountering this activity. + if (context.IsTriggerOfWorkflow()) + { + await Resume(context); + } + else + { + // Create bookmarks for when we receive the expected HTTP request. + context.CreateBookmark(GetStimulus(context.ExpressionExecutionContext), Resume,false); + } + } + + private async ValueTask Resume(ActivityExecutionContext context) + { + var receivedMessage = context.GetWorkflowInput(InputKey); + await SetResultAsync(receivedMessage, context); + await context.CompleteActivityAsync(); + } + + private async Task SetResultAsync(ReceivedServiceBusMessageModel receivedMessage, ActivityExecutionContext context) + { + var bodyAsString = new BinaryData(receivedMessage.Body).ToString(); + var targetType = context.Get(MessageType); + var formatter = Formatter.GetOrDefault(context); + var body = formatter == null ? bodyAsString : await formatter.FromStringAsync(bodyAsString, targetType, context.CancellationToken); + + context.Set(TransportMessage, receivedMessage); + context.Set(Message, body); + } + + private object GetStimulus(ExpressionExecutionContext context) + { + var queueOrTopic = context.Get(QueueOrTopic)!; + var subscription = context.Get(Subscription); + return new MessageReceivedStimulus(queueOrTopic, subscription); + } +} \ No newline at end of file diff --git a/src/Elsa.Integrations.AzureServiceBus/Activities/SendMessage.cs b/src/Elsa.Integrations.AzureServiceBus/Activities/SendMessage.cs new file mode 100644 index 00000000..4fcc6644 --- /dev/null +++ b/src/Elsa.Integrations.AzureServiceBus/Activities/SendMessage.cs @@ -0,0 +1,104 @@ +using System.Runtime.CompilerServices; +using System.Text.Json; +using Azure.Messaging.ServiceBus; +using Elsa.Common; +using Elsa.Common.Services; +using Elsa.Extensions; +using Elsa.Workflows; +using Elsa.Workflows.Attributes; +using Elsa.Workflows.Models; +using Elsa.Workflows.UIHints; +using JetBrains.Annotations; + +namespace Elsa.Integrations.AzureServiceBus.Activities; + +/// +/// Sends a message to a queue or topic in Azure Service Bus. +/// +[Activity("Elsa.AzureServiceBus.Send", "Azure Service Bus", "Send a message to a queue or topic")] +[PublicAPI] +public class SendMessage : CodeActivity +{ + /// + public SendMessage([CallerFilePath] string? source = null, [CallerLineNumber] int? line = null) : base(source, line) + { + } + + /// + /// The contents of the message to send. + /// + [Input(Description = "The contents of the message to send.")] + public Input MessageBody { get; set; } = null!; + + /// + /// The queue or topic to send the message to. + /// + public Input QueueOrTopic { get; set; } = null!; + + /// + /// The content type of the message. + /// + public Input? ContentType { get; set; } + + /// + /// The subject of the message. + /// + public Input? Subject { get; set; } + + /// + /// The correlation ID of the message. + /// + public Input? CorrelationId { get; set; } + + /// + /// The formatter to use when serializing the message body. + /// + public Input FormatterType { get; set; } = null!; + + /// + /// The application properties to embed with the Service Bus Message + /// + [Input(Category = "Advanced", + DefaultSyntax = "Json", + SupportedSyntaxes = ["JavaScript", "Json"], + UIHint = InputUIHints.MultiLine) + ] + public Input?> ApplicationProperties { get; set; } = null!; + + /// + protected override async ValueTask ExecuteAsync(ActivityExecutionContext context) + { + var queueOrTopic = context.Get(QueueOrTopic); + var messageBody = context.Get(MessageBody); + var cancellationToken = context.CancellationToken; + var serializedMessageBody = await SerializeMessageBodyAsync(context, messageBody!, cancellationToken); + + var message = new ServiceBusMessage(serializedMessageBody) + { + ContentType = context.Get(ContentType), + Subject = context.Get(Subject), + CorrelationId = context.Get(CorrelationId) + }; + + var applicationProperties = ApplicationProperties.GetOrDefault(context); + + if (applicationProperties != null) + foreach (var property in applicationProperties) + message.ApplicationProperties.Add(property.Key, ((JsonElement)property.Value).GetString()); + + var client = context.GetRequiredService(); + await using var sender = client.CreateSender(queueOrTopic); + await sender.SendMessageAsync(message, cancellationToken); + } + + private async ValueTask SerializeMessageBodyAsync(ActivityExecutionContext context, object value, CancellationToken cancellationToken) + { + if (value is string s) return BinaryData.FromString(s); + + var formatterType = FormatterType.GetOrDefault(context) ?? typeof(JsonFormatter); + var formatter = context.GetServices().First(x => x.GetType() == formatterType); + var data = await formatter.ToStringAsync(value, cancellationToken); + + return BinaryData.FromString(data); + } +} \ No newline at end of file diff --git a/src/Elsa.Integrations.AzureServiceBus/Contracts/IQueueProvider.cs b/src/Elsa.Integrations.AzureServiceBus/Contracts/IQueueProvider.cs new file mode 100644 index 00000000..717580f7 --- /dev/null +++ b/src/Elsa.Integrations.AzureServiceBus/Contracts/IQueueProvider.cs @@ -0,0 +1,14 @@ +using Elsa.Integrations.AzureServiceBus.Models; + +namespace Elsa.Integrations.AzureServiceBus.Contracts; + +/// +/// Provides queue definitions to the system. +/// +public interface IQueueProvider +{ + /// + /// Returns a list of s. + /// + ValueTask> GetQueuesAsync(CancellationToken cancellationToken); +} \ No newline at end of file diff --git a/src/Elsa.Integrations.AzureServiceBus/Contracts/IServiceBusInitializer.cs b/src/Elsa.Integrations.AzureServiceBus/Contracts/IServiceBusInitializer.cs new file mode 100644 index 00000000..e6dc36a7 --- /dev/null +++ b/src/Elsa.Integrations.AzureServiceBus/Contracts/IServiceBusInitializer.cs @@ -0,0 +1,12 @@ +namespace Elsa.Integrations.AzureServiceBus.Contracts; + +/// +/// Creates queues, topics and subscriptions provided by , and implementations. +/// +public interface IServiceBusInitializer +{ + /// + /// Creates queues, topics and subscriptions provided by , and implementations. + /// + Task InitializeAsync(CancellationToken cancellationToken = default); +} \ No newline at end of file diff --git a/src/Elsa.Integrations.AzureServiceBus/Contracts/ISubscriptionProvider.cs b/src/Elsa.Integrations.AzureServiceBus/Contracts/ISubscriptionProvider.cs new file mode 100644 index 00000000..dab9785a --- /dev/null +++ b/src/Elsa.Integrations.AzureServiceBus/Contracts/ISubscriptionProvider.cs @@ -0,0 +1,16 @@ +using Elsa.Integrations.AzureServiceBus.Models; + +namespace Elsa.Integrations.AzureServiceBus.Contracts; + +/// +/// Provides subscription definitions to the system. +/// +[Obsolete("Use AzureServiceBusOptions.Topics instead.")] + +public interface ISubscriptionProvider +{ + /// + /// Return a list of s. + /// + ValueTask> GetSubscriptionsAsync(CancellationToken cancellationToken); +} \ No newline at end of file diff --git a/src/Elsa.Integrations.AzureServiceBus/Contracts/ITopicProvider.cs b/src/Elsa.Integrations.AzureServiceBus/Contracts/ITopicProvider.cs new file mode 100644 index 00000000..01a37199 --- /dev/null +++ b/src/Elsa.Integrations.AzureServiceBus/Contracts/ITopicProvider.cs @@ -0,0 +1,14 @@ +using Elsa.Integrations.AzureServiceBus.Models; + +namespace Elsa.Integrations.AzureServiceBus.Contracts; + +/// +/// Provides topic definitions to the system. +/// +public interface ITopicProvider +{ + /// + /// Returns a list of s. + /// + ValueTask> GetTopicsAsync(CancellationToken cancellationToken); +} \ No newline at end of file diff --git a/src/Elsa.Integrations.AzureServiceBus/Contracts/IWorkerManager.cs b/src/Elsa.Integrations.AzureServiceBus/Contracts/IWorkerManager.cs new file mode 100644 index 00000000..21c07d60 --- /dev/null +++ b/src/Elsa.Integrations.AzureServiceBus/Contracts/IWorkerManager.cs @@ -0,0 +1,27 @@ +using Elsa.Integrations.AzureServiceBus.Services; + +namespace Elsa.Integrations.AzureServiceBus.Contracts; + +/// +/// Manages message workers. +/// +public interface IWorkerManager +{ + /// + /// A list of workers under management. + /// + IEnumerable Workers { get; } + + /// + /// Ensures that at least one worker exists for the specified queue/topic and subscription. + /// + Task StartWorkerAsync(string queueOrTopic, string? subscription, CancellationToken cancellationToken = default); + + /// + /// Finds a worker for the specified queue or topic and subscription. + /// + /// The name of the queue or topic. + /// The name of the subscription. + /// The worker, or null if no worker was found. + Worker? FindWorkerFor(string queueOrTopic, string? subscription); +} \ No newline at end of file diff --git a/src/Elsa.Integrations.AzureServiceBus/Elsa.Integrations.AzureServiceBus.csproj b/src/Elsa.Integrations.AzureServiceBus/Elsa.Integrations.AzureServiceBus.csproj new file mode 100644 index 00000000..43b4c0a8 --- /dev/null +++ b/src/Elsa.Integrations.AzureServiceBus/Elsa.Integrations.AzureServiceBus.csproj @@ -0,0 +1,20 @@ + + + + + Provides Azure Service Bus integration and activities. + + elsa module azure-service-bus service-bus + + + + + + + + + + + + + diff --git a/src/Elsa.Integrations.AzureServiceBus/Extensions/ModuleExtensions.cs b/src/Elsa.Integrations.AzureServiceBus/Extensions/ModuleExtensions.cs new file mode 100644 index 00000000..21a1d150 --- /dev/null +++ b/src/Elsa.Integrations.AzureServiceBus/Extensions/ModuleExtensions.cs @@ -0,0 +1,25 @@ +using Elsa.Features.Services; +using Elsa.Integrations.AzureServiceBus.Features; + +// ReSharper disable once CheckNamespace +namespace Elsa.Extensions; + +/// +/// Adds extension methods to to register Azure Service Bus related services. +/// +public static class ModuleExtensions +{ + /// + /// Enable and configure the feature. + /// + public static IModule UseAzureServiceBus(this IModule module, string connectionStringOrName, Action? setup = null) + { + setup += feature => feature.AzureServiceBusOptions += options => options.ConnectionStringOrName = connectionStringOrName; + return module.Use(setup); + } + + /// + /// Enable and configure the feature. + /// + public static IModule UseAzureServiceBus(this IModule module, Action? setup = null) => module.Use(setup); +} \ No newline at end of file diff --git a/src/Elsa.Integrations.AzureServiceBus/Features/AzureServiceBusFeature.cs b/src/Elsa.Integrations.AzureServiceBus/Features/AzureServiceBusFeature.cs new file mode 100644 index 00000000..5136d12e --- /dev/null +++ b/src/Elsa.Integrations.AzureServiceBus/Features/AzureServiceBusFeature.cs @@ -0,0 +1,94 @@ +using Azure.Messaging.ServiceBus; +using Azure.Messaging.ServiceBus.Administration; +using Elsa.Extensions; +using Elsa.Features.Abstractions; +using Elsa.Features.Services; +using Elsa.Integrations.AzureServiceBus.Contracts; +using Elsa.Integrations.AzureServiceBus.Handlers; +using Elsa.Integrations.AzureServiceBus.HostedServices; +using Elsa.Integrations.AzureServiceBus.Options; +using Elsa.Integrations.AzureServiceBus.Providers; +using Elsa.Integrations.AzureServiceBus.Services; +using Elsa.Integrations.AzureServiceBus.Tasks; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; + +namespace Elsa.Integrations.AzureServiceBus.Features; + +/// +/// Enables and configures the Azure Service Bus feature. +/// +public class AzureServiceBusFeature : FeatureBase +{ + /// + public AzureServiceBusFeature(IModule module) : base(module) + { + } + + /// + /// A value controlling whether queues, topics and subscriptions should be created automatically. + /// + public bool CreateQueuesTopicsAndSubscriptions { get; set; } = true; + + /// + /// A delegate to configure . + /// + public Action AzureServiceBusOptions { get; set; } = _ => { }; + + /// + /// A delegate to create a instance. + /// + public Func ServiceBusClientFactory { get; set; } = sp => new(GetConnectionString(sp)); + + /// + /// A delegate to create a instance. + /// + public Func ServiceBusAdministrationClientFactory { get; set; } = sp => new(GetConnectionString(sp)); + + /// + public override void ConfigureHostedServices() + { + if (CreateQueuesTopicsAndSubscriptions) + Module.ConfigureHostedService(); + } + + /// + public override void Configure() + { + // Activities. + Module.AddActivitiesFrom(); + } + + /// + public override void Apply() + { + Services.Configure(AzureServiceBusOptions); + + Services + .AddSingleton(ServiceBusAdministrationClientFactory) + .AddSingleton(ServiceBusClientFactory) + .AddSingleton() + .AddSingleton() + .AddScoped(); + + // Tasks. + Services.AddBackgroundTask(); + + // Definition providers. + Services + .AddSingleton(sp => sp.GetRequiredService()) + .AddSingleton(sp => sp.GetRequiredService()) + .AddSingleton(sp => sp.GetRequiredService()); + + // Handlers. + Services.AddHandlersFrom(); + } + + private static string GetConnectionString(IServiceProvider serviceProvider) + { + var options = serviceProvider.GetRequiredService>().Value; + var configuration = serviceProvider.GetRequiredService(); + return configuration.GetConnectionString(options.ConnectionStringOrName) ?? options.ConnectionStringOrName; + } +} \ No newline at end of file diff --git a/src/Elsa.Integrations.AzureServiceBus/FodyWeavers.xml b/src/Elsa.Integrations.AzureServiceBus/FodyWeavers.xml new file mode 100644 index 00000000..00e1d9a1 --- /dev/null +++ b/src/Elsa.Integrations.AzureServiceBus/FodyWeavers.xml @@ -0,0 +1,3 @@ + + + \ No newline at end of file diff --git a/src/Elsa.Integrations.AzureServiceBus/Handlers/UpdateWorkers.cs b/src/Elsa.Integrations.AzureServiceBus/Handlers/UpdateWorkers.cs new file mode 100644 index 00000000..4dc01dfa --- /dev/null +++ b/src/Elsa.Integrations.AzureServiceBus/Handlers/UpdateWorkers.cs @@ -0,0 +1,39 @@ +using Elsa.Extensions; +using Elsa.Integrations.AzureServiceBus.Activities; +using Elsa.Integrations.AzureServiceBus.Contracts; +using Elsa.Integrations.AzureServiceBus.Models; +using Elsa.Mediator.Contracts; +using Elsa.Workflows.Runtime.Notifications; +using JetBrains.Annotations; + +namespace Elsa.Integrations.AzureServiceBus.Handlers; + +/// +/// Creates workers for each trigger & bookmark in response to updated workflow trigger indexes and bookmarks. +/// +[UsedImplicitly] +public class UpdateWorkers(IWorkerManager workerManager) : INotificationHandler, INotificationHandler +{ + /// + /// Adds, updates and removes workers based on added and removed triggers. + /// + public async Task HandleAsync(WorkflowTriggersIndexed notification, CancellationToken cancellationToken) + { + var added = notification.IndexedWorkflowTriggers.AddedTriggers.Filter().Select(x => x.GetPayload()); + await StartWorkersAsync(added, cancellationToken); + } + + /// + /// Adds, updates and removes workers based on added and removed bookmarks. + /// + public async Task HandleAsync(WorkflowBookmarksIndexed notification, CancellationToken cancellationToken) + { + var added = notification.IndexedWorkflowBookmarks.AddedBookmarks.Filter().Select(x => x.GetPayload()); + await StartWorkersAsync(added, cancellationToken); + } + + private async Task StartWorkersAsync(IEnumerable payloads, CancellationToken cancellationToken) + { + foreach (var payload in payloads) await workerManager.StartWorkerAsync(payload.QueueOrTopic, payload.Subscription, cancellationToken); + } +} \ No newline at end of file diff --git a/src/Elsa.Integrations.AzureServiceBus/HostedServices/CreateQueuesTopicsAndSubscriptions.cs b/src/Elsa.Integrations.AzureServiceBus/HostedServices/CreateQueuesTopicsAndSubscriptions.cs new file mode 100644 index 00000000..17b3db15 --- /dev/null +++ b/src/Elsa.Integrations.AzureServiceBus/HostedServices/CreateQueuesTopicsAndSubscriptions.cs @@ -0,0 +1,24 @@ +using Elsa.Integrations.AzureServiceBus.Contracts; +using JetBrains.Annotations; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; + +namespace Elsa.Integrations.AzureServiceBus.HostedServices; + +/// +/// A blocking hosted service that creates queues, topics and subscriptions. +/// +[UsedImplicitly] +public class CreateQueuesTopicsAndSubscriptions(IServiceScopeFactory scopeFactory) : IHostedService +{ + /// + public async Task StartAsync(CancellationToken cancellationToken) + { + await using var scope = scopeFactory.CreateAsyncScope(); + var initializer = scope.ServiceProvider.GetRequiredService(); + await initializer.InitializeAsync(cancellationToken); + } + + /// + public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask; +} \ No newline at end of file diff --git a/src/Elsa.Integrations.AzureServiceBus/Models/MessageReceivedStimulus.cs b/src/Elsa.Integrations.AzureServiceBus/Models/MessageReceivedStimulus.cs new file mode 100644 index 00000000..1ecae9aa --- /dev/null +++ b/src/Elsa.Integrations.AzureServiceBus/Models/MessageReceivedStimulus.cs @@ -0,0 +1,47 @@ +using System.Text.Json.Serialization; + +namespace Elsa.Integrations.AzureServiceBus.Models; + +/// +/// A bookmark payload model for triggering workflows when messages come in at a given queue or topic and subscription. +/// +public record MessageReceivedStimulus +{ + private readonly string _queueOrTopic = null!; + private readonly string? _subscription; + + /// + /// Constructor. + /// + [JsonConstructor] + public MessageReceivedStimulus() + { + } + + /// + /// Constructor. + /// + public MessageReceivedStimulus(string queueOrTopic, string? subscription) + { + QueueOrTopic = queueOrTopic; + Subscription = subscription; + } + + /// + /// The queue or topic to trigger from. + /// + public string QueueOrTopic + { + get => _queueOrTopic; + init => _queueOrTopic = value.ToLowerInvariant(); + } + + /// + /// The subscription to trigger from. + /// + public string? Subscription + { + get => _subscription; + init => _subscription = value?.ToLowerInvariant(); + } +} \ No newline at end of file diff --git a/src/Elsa.Integrations.AzureServiceBus/Models/QueueDefinition.cs b/src/Elsa.Integrations.AzureServiceBus/Models/QueueDefinition.cs new file mode 100644 index 00000000..78cb6887 --- /dev/null +++ b/src/Elsa.Integrations.AzureServiceBus/Models/QueueDefinition.cs @@ -0,0 +1,6 @@ +namespace Elsa.Integrations.AzureServiceBus.Models; + +/// +/// Represents a queue that is available to the system. +/// +public record QueueDefinition(string Name); \ No newline at end of file diff --git a/src/Elsa.Integrations.AzureServiceBus/Models/ReceivedServiceBusMessageModel.cs b/src/Elsa.Integrations.AzureServiceBus/Models/ReceivedServiceBusMessageModel.cs new file mode 100644 index 00000000..4890a9e2 --- /dev/null +++ b/src/Elsa.Integrations.AzureServiceBus/Models/ReceivedServiceBusMessageModel.cs @@ -0,0 +1,36 @@ +using JetBrains.Annotations; + +namespace Elsa.Integrations.AzureServiceBus.Models; + +/// +/// A serializable version of . +/// +// Needs to be a class and not a record, because of the polymorphic serialization that cannot deal with $type properties. +[PublicAPI] +public class ReceivedServiceBusMessageModel +{ + public byte[] Body { get; init; } = null!; + public string? Subject { get; init; } + public string? ContentType { get; init; } + public string? To { get; init; } + public string? CorrelationId { get; init; } + public int DeliveryCount { get; init; } + public DateTimeOffset EnqueuedTime { get; init; } + public DateTimeOffset ScheduledEnqueuedTime { get; init; } + public DateTimeOffset ExpiresAt { get; init; } + public DateTimeOffset LockedUntil { get; init; } + public TimeSpan TimeToLive { get; init; } + public string? LockToken { get; init; } + public string? MessageId { get; init; } + public string? PartitionKey { get; init; } + public string? TransactionPartitionKey { get; init; } + public string? ReplyTo { get; init; } + public long SequenceNumber { get; init; } + public long EnqueuedSequenceNumber { get; init; } + public string? SessionId { get; init; } + public string? ReplyToSessionId { get; init; } + public string? DeadLetterReason { get; init; } + public string? DeadLetterSource { get; init; } + public string? DeadLetterErrorDescription { get; init; } + public IReadOnlyDictionary ApplicationProperties { get; init; } = new Dictionary(); +} \ No newline at end of file diff --git a/src/Elsa.Integrations.AzureServiceBus/Models/SubscriptionDefinition.cs b/src/Elsa.Integrations.AzureServiceBus/Models/SubscriptionDefinition.cs new file mode 100644 index 00000000..216824f6 --- /dev/null +++ b/src/Elsa.Integrations.AzureServiceBus/Models/SubscriptionDefinition.cs @@ -0,0 +1,18 @@ +namespace Elsa.Integrations.AzureServiceBus.Models; + +/// +/// Represents a topic subscription that is available to the system. +/// +public class SubscriptionDefinition +{ + /// + /// The subscription name. + /// + public string Name { get; set; } = null!; + + /// + /// The topic. + /// + [Obsolete("Use TopicDefinition.Subscriptions instead.")] + public string? Topic { get; set; } +} \ No newline at end of file diff --git a/src/Elsa.Integrations.AzureServiceBus/Models/TopicDefinition.cs b/src/Elsa.Integrations.AzureServiceBus/Models/TopicDefinition.cs new file mode 100644 index 00000000..94a99996 --- /dev/null +++ b/src/Elsa.Integrations.AzureServiceBus/Models/TopicDefinition.cs @@ -0,0 +1,17 @@ +namespace Elsa.Integrations.AzureServiceBus.Models; + +/// +/// Represents a topic that is available to the system. +/// +public class TopicDefinition +{ + /// + /// The topic name. + /// + public string Name { get; set; } = null!; + + /// + /// The subscriptions. + /// + public ICollection Subscriptions { get; set; } = new List(); +} \ No newline at end of file diff --git a/src/Elsa.Integrations.AzureServiceBus/Options/AzureServiceBusOptions.cs b/src/Elsa.Integrations.AzureServiceBus/Options/AzureServiceBusOptions.cs new file mode 100644 index 00000000..8af2ac56 --- /dev/null +++ b/src/Elsa.Integrations.AzureServiceBus/Options/AzureServiceBusOptions.cs @@ -0,0 +1,30 @@ +using Elsa.Integrations.AzureServiceBus.Models; + +namespace Elsa.Integrations.AzureServiceBus.Options; + +/// +/// A collection of settings to configure integration with Azure Service Bus. +/// +public class AzureServiceBusOptions +{ + /// + /// Th connection string or connection string name to connect with the service bus. + /// + public string ConnectionStringOrName { get; set; } = null!; + + /// + /// A list of s to create. + /// + public ICollection Queues { get; set; } = new List(); + + /// + /// A list of s to create. + /// + public ICollection Topics { get; set; } = new List(); + + /// + /// A list of s to create. + /// + [Obsolete("Use TopicDefinition.Subscriptions instead.")] + public ICollection Subscriptions { get; set; } = new List(); +} \ No newline at end of file diff --git a/src/Elsa.Integrations.AzureServiceBus/Providers/ConfigurationQueueTopicAndSubscriptionProvider.cs b/src/Elsa.Integrations.AzureServiceBus/Providers/ConfigurationQueueTopicAndSubscriptionProvider.cs new file mode 100644 index 00000000..f655bed4 --- /dev/null +++ b/src/Elsa.Integrations.AzureServiceBus/Providers/ConfigurationQueueTopicAndSubscriptionProvider.cs @@ -0,0 +1,28 @@ +using Elsa.Integrations.AzureServiceBus.Contracts; +using Elsa.Integrations.AzureServiceBus.Models; +using Elsa.Integrations.AzureServiceBus.Options; +using Microsoft.Extensions.Options; + +namespace Elsa.Integrations.AzureServiceBus.Providers; + +/// +/// Represents a queue provider that reads queue definitions from configuration. +/// +public class ConfigurationQueueTopicAndSubscriptionProvider : IQueueProvider, ITopicProvider, ISubscriptionProvider +{ + private readonly AzureServiceBusOptions _options; + + /// + /// Constructor. + /// + public ConfigurationQueueTopicAndSubscriptionProvider(IOptions options) => _options = options.Value; + + /// + public ValueTask> GetQueuesAsync(CancellationToken cancellationToken) => new(_options.Queues); + + /// + public ValueTask> GetTopicsAsync(CancellationToken cancellationToken) => new(_options.Topics); + + /// + public ValueTask> GetSubscriptionsAsync(CancellationToken cancellationToken) => new(_options.Subscriptions); +} \ No newline at end of file diff --git a/src/Elsa.Integrations.AzureServiceBus/Services/ServiceBusInitializer.cs b/src/Elsa.Integrations.AzureServiceBus/Services/ServiceBusInitializer.cs new file mode 100644 index 00000000..a56c184e --- /dev/null +++ b/src/Elsa.Integrations.AzureServiceBus/Services/ServiceBusInitializer.cs @@ -0,0 +1,90 @@ +using Azure.Messaging.ServiceBus.Administration; +using Elsa.Integrations.AzureServiceBus.Contracts; + +namespace Elsa.Integrations.AzureServiceBus.Services; + +/// +public class ServiceBusInitializer( + ServiceBusAdministrationClient serviceBusAdministrationClient, + IEnumerable queueProviders, + IEnumerable topicProviders, + IEnumerable subscriptionProviders) + : IServiceBusInitializer +{ + private readonly IReadOnlyCollection _queueProviders = queueProviders.ToList(); + private readonly IReadOnlyCollection _topicProviders = topicProviders.ToList(); + private readonly IReadOnlyCollection _subscriptionProviders = subscriptionProviders.ToList(); + + /// + public async Task InitializeAsync(CancellationToken cancellationToken = default) + { + var tasks = new[] + { + CreateQueuesAsync(cancellationToken), + CreateTopicsAsync(cancellationToken), + CreateSubscriptionsAsync(cancellationToken) + }; + await Task.WhenAll(tasks); + } + + private async Task CreateQueuesAsync(CancellationToken cancellationToken) + { + var definitions = (await Task.WhenAll(_queueProviders.Select(async x => await x.GetQueuesAsync(cancellationToken)))).SelectMany(x => x); + var parallelOptions = new ParallelOptions + { + CancellationToken = cancellationToken, + MaxDegreeOfParallelism = 5 + }; + await Parallel.ForEachAsync(definitions, parallelOptions, async (definition, ct) => + { + if (!await serviceBusAdministrationClient.QueueExistsAsync(definition.Name, ct)) + await serviceBusAdministrationClient.CreateQueueAsync(definition.Name, ct); + }); + } + + private async Task CreateTopicsAsync(CancellationToken cancellationToken) + { + var definitionTasks = _topicProviders.Select(async x => await x.GetTopicsAsync(cancellationToken)); + var definitions = (await Task.WhenAll(definitionTasks)).SelectMany(x => x).ToList(); + + if (!definitions.Any()) + return; + + var parallelOptions = new ParallelOptions + { + CancellationToken = cancellationToken, + MaxDegreeOfParallelism = 5 + }; + await Parallel.ForEachAsync(definitions, parallelOptions, async (topic, ct) => + { + if (!await serviceBusAdministrationClient.TopicExistsAsync(topic.Name, ct)) + await serviceBusAdministrationClient.CreateTopicAsync(topic.Name, ct); + + foreach (var subscription in topic.Subscriptions) + { + if (!await serviceBusAdministrationClient.SubscriptionExistsAsync(topic.Name, subscription.Name, ct)) + await serviceBusAdministrationClient.CreateSubscriptionAsync(topic.Name, subscription.Name, ct); + } + }); + } + + private async Task CreateSubscriptionsAsync(CancellationToken cancellationToken) + { + var definitionTasks = _subscriptionProviders.Select(async x => await x.GetSubscriptionsAsync(cancellationToken)); + var definitions = (await Task.WhenAll(definitionTasks)).SelectMany(x => x).ToList(); + + if (!definitions.Any()) + return; + + var parallelOptions = new ParallelOptions + { + CancellationToken = cancellationToken, + MaxDegreeOfParallelism = 5 + }; + await Parallel.ForEachAsync(definitions, parallelOptions, async (definition, ct) => + { + if (!await serviceBusAdministrationClient.SubscriptionExistsAsync(definition.Topic, definition.Name, ct)) + await serviceBusAdministrationClient.CreateSubscriptionAsync(definition.Topic, definition.Name, ct); + }); + } +} \ No newline at end of file diff --git a/src/Elsa.Integrations.AzureServiceBus/Services/Worker.cs b/src/Elsa.Integrations.AzureServiceBus/Services/Worker.cs new file mode 100644 index 00000000..ae90c449 --- /dev/null +++ b/src/Elsa.Integrations.AzureServiceBus/Services/Worker.cs @@ -0,0 +1,122 @@ +using Azure.Messaging.ServiceBus; +using Elsa.Integrations.AzureServiceBus.Activities; +using Elsa.Integrations.AzureServiceBus.Models; +using Elsa.Workflows.Runtime; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; + +namespace Elsa.Integrations.AzureServiceBus.Services; + +/// +/// Processes messages received via a queue specified through the . +/// When a message is received, the appropriate workflows are executed. +/// +public class Worker : IAsyncDisposable +{ + private readonly ServiceBusProcessor _processor; + private readonly IServiceScopeFactory _serviceScopeFactory; + private readonly ILogger _logger; + + /// + /// Initializes a new instance of the class. + /// + public Worker(string queueOrTopic, string? subscription, ServiceBusClient client, IServiceScopeFactory serviceScopeFactory, ILogger logger) + { + QueueOrTopic = queueOrTopic; + Subscription = subscription == "" ? null : subscription; + _serviceScopeFactory = serviceScopeFactory; + _logger = logger; + + var options = new ServiceBusProcessorOptions(); + var processor = string.IsNullOrEmpty(subscription) ? client.CreateProcessor(queueOrTopic, options) : client.CreateProcessor(queueOrTopic, subscription, options); + + processor.ProcessMessageAsync += OnMessageReceivedAsync; + processor.ProcessErrorAsync += OnErrorAsync; + _processor = processor; + } + + /// + /// The name of the queue or topic that this worker is processing. + /// + public string QueueOrTopic { get; } + + /// + /// The name of the subscription that this worker is processing. Only valid if the worker is processing a topic. + /// + public string? Subscription { get; } + + /// + /// Starts the worker. + /// + /// The cancellation token. + public async Task StartAsync(CancellationToken cancellationToken = default) => await _processor.StartProcessingAsync(cancellationToken); + + /// + /// Disposes the worker. + /// + public async ValueTask DisposeAsync() + { + _processor.ProcessMessageAsync -= OnMessageReceivedAsync; + _processor.ProcessErrorAsync -= OnErrorAsync; + await _processor.DisposeAsync(); + } + + private async Task OnMessageReceivedAsync(ProcessMessageEventArgs args) => await InvokeWorkflowsAsync(args.Message, args.CancellationToken); + + private Task OnErrorAsync(ProcessErrorEventArgs args) + { + _logger.LogError(args.Exception, "An error occurred while processing {EntityPath}", args.EntityPath); + return Task.CompletedTask; + } + + private async Task InvokeWorkflowsAsync(ServiceBusReceivedMessage message, CancellationToken cancellationToken) + { + var input = new Dictionary + { + [MessageReceived.InputKey] = CreateMessageModel(message) + }; + + var metadata = new StimulusMetadata + { + CorrelationId = message.CorrelationId, + Input = input, + }; + var stimulus = new MessageReceivedStimulus(QueueOrTopic, Subscription); + await using var scope = _serviceScopeFactory.CreateAsyncScope(); + var stimulusSender = scope.ServiceProvider.GetRequiredService(); + var result = await stimulusSender.SendAsync(stimulus, metadata, cancellationToken); + + _logger.LogDebug("{Count} workflow triggered by the service bus message", result.WorkflowInstanceResponses.Count); + } + + private static ReceivedServiceBusMessageModel CreateMessageModel(ServiceBusReceivedMessage message) + { + return new ReceivedServiceBusMessageModel + { + Body = message.Body.ToArray(), + Subject = message.Subject, + ContentType = message.ContentType, + To = message.To, + CorrelationId = message.CorrelationId, + DeliveryCount = message.DeliveryCount, + EnqueuedTime = message.EnqueuedTime, + ScheduledEnqueuedTime = message.ScheduledEnqueueTime, + ExpiresAt = message.ExpiresAt, + LockedUntil = message.LockedUntil, + TimeToLive = message.TimeToLive, + LockToken = message.LockToken, + MessageId = message.MessageId, + PartitionKey = message.PartitionKey, + TransactionPartitionKey = message.TransactionPartitionKey, + ReplyTo = message.ReplyTo, + SequenceNumber = message.SequenceNumber, + EnqueuedSequenceNumber = message.EnqueuedSequenceNumber, + SessionId = message.SessionId, + ReplyToSessionId = message.ReplyToSessionId, + DeadLetterReason = message.DeadLetterReason, + DeadLetterSource = message.DeadLetterSource, + DeadLetterErrorDescription = message.DeadLetterErrorDescription, + ApplicationProperties = message.ApplicationProperties + }; + } +} \ No newline at end of file diff --git a/src/Elsa.Integrations.AzureServiceBus/Services/WorkerManager.cs b/src/Elsa.Integrations.AzureServiceBus/Services/WorkerManager.cs new file mode 100644 index 00000000..d0e16e52 --- /dev/null +++ b/src/Elsa.Integrations.AzureServiceBus/Services/WorkerManager.cs @@ -0,0 +1,43 @@ +using Elsa.Integrations.AzureServiceBus.Contracts; +using Microsoft.Extensions.DependencyInjection; + +namespace Elsa.Integrations.AzureServiceBus.Services; + +/// +/// Manages message workers. +/// +public class WorkerManager(IServiceProvider serviceProvider) : IWorkerManager, IAsyncDisposable +{ + private readonly ICollection _workers = new List(); + + /// + /// A list of workers under management. + /// + public IEnumerable Workers => _workers.ToList(); + + /// + public async Task StartWorkerAsync(string queueOrTopic, string? subscription, CancellationToken cancellationToken = default) + { + var worker = FindWorkerFor(queueOrTopic, subscription); + if (worker != null) return; + await CreateWorkerAsync(queueOrTopic, subscription, cancellationToken); + } + + /// + public Worker? FindWorkerFor(string queueOrTopic, string? subscription) => _workers.FirstOrDefault(x => x.QueueOrTopic == queueOrTopic && x.Subscription == subscription); + + private async Task CreateWorkerAsync(string queueOrTopic, string? subscription, CancellationToken cancellationToken = default) + { + subscription ??= ""; + var worker = ActivatorUtilities.CreateInstance(serviceProvider, queueOrTopic, subscription!); + + _workers.Add(worker); + await worker.StartAsync(cancellationToken); + } + + /// + public async ValueTask DisposeAsync() + { + foreach (var worker in Workers) await worker.DisposeAsync(); + } +} \ No newline at end of file diff --git a/src/Elsa.Integrations.AzureServiceBus/Tasks/StartWorkers.cs b/src/Elsa.Integrations.AzureServiceBus/Tasks/StartWorkers.cs new file mode 100644 index 00000000..0e22de4f --- /dev/null +++ b/src/Elsa.Integrations.AzureServiceBus/Tasks/StartWorkers.cs @@ -0,0 +1,49 @@ +using Elsa.Common; +using Elsa.Extensions; +using Elsa.Integrations.AzureServiceBus.Activities; +using Elsa.Integrations.AzureServiceBus.Contracts; +using Elsa.Integrations.AzureServiceBus.Models; +using Elsa.Workflows.Helpers; +using Elsa.Workflows.Runtime; +using Elsa.Workflows.Runtime.Filters; +using JetBrains.Annotations; + +namespace Elsa.Integrations.AzureServiceBus.Tasks; + +/// +/// Creates workers for each trigger & bookmark in response to updated workflow trigger indexes and bookmarks. +/// +[UsedImplicitly] +public class StartWorkers(ITriggerStore triggerStore, IBookmarkStore bookmarkStore, IWorkerManager workerManager) : BackgroundTask +{ + /// + public override async Task StartAsync(CancellationToken cancellationToken) + { + var activityType = ActivityTypeNameHelper.GenerateTypeName(); + var triggerFilter = new TriggerFilter + { + Name = activityType + }; + var triggerStimuli = (await triggerStore.FindManyAsync(triggerFilter, cancellationToken)).Select(x => x.GetPayload()).ToList(); + var bookmarkFilter = new BookmarkFilter + { + ActivityTypeName = activityType + }; + var bookmarkStimuli = (await bookmarkStore.FindManyAsync(bookmarkFilter, cancellationToken)).Select(x => x.GetPayload()).ToList(); + var stimuli = triggerStimuli.Concat(bookmarkStimuli).ToList(); + + await EnsureWorkersAsync(stimuli, cancellationToken); + } + + /// + public override Task StopAsync(CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + + private async Task EnsureWorkersAsync(IEnumerable stimuli, CancellationToken cancellationToken) + { + foreach (var stimulus in stimuli) + await workerManager.StartWorkerAsync(stimulus.QueueOrTopic, stimulus.Subscription, cancellationToken); + } +} \ No newline at end of file