Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 24 additions & 13 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,22 @@
</PropertyGroup>
<ItemGroup>
<PackageVersion Include="Azure.Identity" Version="1.13.2" />
<PackageVersion Include="Azure.Messaging.ServiceBus" Version="7.18.3" />
<PackageVersion Include="Azure.Messaging.ServiceBus" Version="7.18.4" />
<PackageVersion Include="Azure.ResourceManager" Version="1.13.0" />
<PackageVersion Include="Azure.ResourceManager.AppContainers" Version="1.3.0" />
<PackageVersion Include="Azure.ResourceManager.Resources" Version="1.9.0" />
<PackageVersion Include="Azure.ResourceManager.ServiceBus" Version="1.0.1" />
<PackageVersion Include="Azure.ResourceManager.ServiceBus" Version="1.1.0" />
<PackageVersion Include="Azure.Storage.Blobs" Version="12.23.0" />
<PackageVersion Include="BenchmarkDotNet" Version="0.14.0" />
<PackageVersion Include="Bogus" Version="35.6.1" />
<PackageVersion Include="Bogus" Version="35.6.2" />
<PackageVersion Include="ConfigureAwait.Fody" Version="3.3.2" />
<PackageVersion Include="coverlet.collector" Version="6.0.4" />
<PackageVersion Include="Elsa" Version="$(ElsaVersion)" />
<PackageVersion Include="Elsa.Workflows.Core" Version="$(ElsaVersion)" />
<PackageVersion Include="Elsa.Http" Version="$(ElsaVersion)" />
<PackageVersion Include="Elsa.Webhook" Version="$(ElsaVersion)" />
<PackageVersion Include="Elsa.Webhooks" Version="3.3.3" />
<PackageVersion Include="Elsa.Webhooks" Version="$(ElsaVersion)" />
<PackageVersion Include="Elsa.Workflows.Management" Version="$(ElsaVersion)" />
<PackageVersion Include="Elsa.Workflows.Runtime" Version="$(ElsaVersion)" />
<PackageVersion Include="FastEndpoints" Version="5.34.0" />
<PackageVersion Include="FastEndpoints.Security" Version="5.34.0" />
<PackageVersion Include="FastEndpoints.Swagger" Version="5.34.0" />
Expand All @@ -27,25 +29,34 @@
<PackageVersion Include="GitHubActionsTestLogger" Version="2.4.1" />
<PackageVersion Include="Humanizer.Core" Version="2.14.1" />
<PackageVersion Include="JetBrains.Annotations" Version="2024.3.0" />
<PackageVersion Include="Microsoft.AspNetCore.Mvc.Testing" Version="8.0.7" />
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp.Scripting" Version="4.10.0" />
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp.Scripting" Version="4.12.0" />
<PackageVersion Include="Microsoft.EntityFrameworkCore.Sqlite.Design" Version="1.1.6" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection" Version="8.0.1" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="8.0.2" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.10.0" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.13.0" />
<PackageVersion Include="Microsoft.SourceLink.GitHub" Version="8.0.0" />
<PackageVersion Include="Refit" Version="8.0.0" />
<PackageVersion Include="Refit.HttpClientFactory" Version="8.0.0" />
<PackageVersion Include="SlackNet" Version="0.15.5" />
<PackageVersion Include="xunit" Version="2.9.3" />
<PackageVersion Include="xunit.abstractions" Version="2.0.3" />
<PackageVersion Include="xunit.categories" Version="2.0.8" />
<PackageVersion Include="xunit.extensibility.core" Version="2.9.0" />
<PackageVersion Include="xunit.runner.visualstudio" Version="2.8.2" />
<PackageVersion Include="System.Text.Json" Version="9.01" />
<PackageVersion Include="xunit.extensibility.core" Version="2.9.3" />
<PackageVersion Include="xunit.runner.visualstudio" Version="3.0.2" />
<PackageVersion Include="System.Linq.Async" Version="6.0.1" />
</ItemGroup>
<ItemGroup Condition="'$(TargetFramework)' == 'net8.0'">
<PackageVersion Include="Microsoft.AspNetCore.Mvc.Testing" Version="8.0.13" />
<PackageVersion Include="Microsoft.Extensions.Configuration.Abstractions" Version="8.0.0" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection" Version="8.0.1" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="8.0.2" />
<PackageVersion Include="Microsoft.Extensions.Hosting.Abstractions" Version="8.0.1" />
<PackageVersion Include="System.Text.Json" Version="8.0.5" />
</ItemGroup>
<ItemGroup Condition="'$(TargetFramework)' == 'net9.0'">
<PackageVersion Include="Microsoft.AspNetCore.Mvc.Testing" Version="9.0.2" />
<PackageVersion Include="Microsoft.Extensions.Configuration.Abstractions" Version="9.0.2" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection" Version="9.0.2" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="9.0.2" />
<PackageVersion Include="Microsoft.Extensions.Hosting.Abstractions" Version="9.0.2" />
<PackageVersion Include="System.Text.Json" Version="9.0.2" />
</ItemGroup>
</Project>
7 changes: 7 additions & 0 deletions Elsa.Integrations.sln
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Elsa.Integrations.Telnyx",
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Elsa.Integrations.OrchardCore", "src\Elsa.Integrations.OrchardCore\Elsa.Integrations.OrchardCore.csproj", "{71D93DC7-A455-4EDC-86DB-826CCEECEEF8}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Elsa.Integrations.AzureServiceBus", "src\Elsa.Integrations.AzureServiceBus\Elsa.Integrations.AzureServiceBus.csproj", "{BC283CFE-D542-4D40-AE7F-6888B876AA2B}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand All @@ -58,6 +60,10 @@ Global
{71D93DC7-A455-4EDC-86DB-826CCEECEEF8}.Debug|Any CPU.Build.0 = Debug|Any CPU
{71D93DC7-A455-4EDC-86DB-826CCEECEEF8}.Release|Any CPU.ActiveCfg = Release|Any CPU
{71D93DC7-A455-4EDC-86DB-826CCEECEEF8}.Release|Any CPU.Build.0 = Release|Any CPU
{BC283CFE-D542-4D40-AE7F-6888B876AA2B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{BC283CFE-D542-4D40-AE7F-6888B876AA2B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{BC283CFE-D542-4D40-AE7F-6888B876AA2B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{BC283CFE-D542-4D40-AE7F-6888B876AA2B}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -68,5 +74,6 @@ Global
{861E1230-F9CB-450C-845E-04DFDA259E26} = {A99FA26E-2098-403A-BD04-6BBCFBE3AC7D}
{128B2FC3-81A7-4327-9665-9155B05F21DA} = {527248D6-B851-4C8D-8667-E2FB0A91DABF}
{71D93DC7-A455-4EDC-86DB-826CCEECEEF8} = {527248D6-B851-4C8D-8667-E2FB0A91DABF}
{BC283CFE-D542-4D40-AE7F-6888B876AA2B} = {527248D6-B851-4C8D-8667-E2FB0A91DABF}
EndGlobalSection
EndGlobal
126 changes: 126 additions & 0 deletions src/Elsa.Integrations.AzureServiceBus/Activities/MessageReceived.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
using System.Runtime.CompilerServices;
using Elsa.Common;
using Elsa.Expressions.Models;
using Elsa.Extensions;
using Elsa.Integrations.AzureServiceBus.Models;
using Elsa.Workflows;
using Elsa.Workflows.Attributes;
using Elsa.Workflows.Models;

namespace Elsa.Integrations.AzureServiceBus.Activities;

/// <summary>
/// Triggered when a message is received on a specified queue or topic and subscription.
/// </summary>
[Activity("Elsa.AzureServiceBus", "Azure Service Bus", "Executes when a message is received from the configured queue or topic and subscription")]
public class MessageReceived : Trigger
{
internal const string InputKey = "TransportMessage";

/// <inheritdoc />
public MessageReceived([CallerFilePath] string? source = null, [CallerLineNumber] int? line = null) : base(source, line)
{
}

/// <inheritdoc />
public MessageReceived(Input<string> queue)
{
QueueOrTopic = queue;
}

/// <inheritdoc />
public MessageReceived(string queue) : this(new Input<string>(queue))
{
}

/// <inheritdoc />
public MessageReceived(Input<string> topic, Input<string> subscription)
{
QueueOrTopic = topic;
Subscription = subscription;
}

/// <inheritdoc />
public MessageReceived(string topic, string subscription) : this(new Input<string>(topic), new Input<string>(subscription))
{
}

/// <summary>
/// The name of the queue or topic to read from.
/// </summary>
[Input(Description = "The name of the queue or topic to read from.")]
public Input<string> QueueOrTopic { get; set; } = null!;

/// <summary>
/// The name of the subscription to read from.
/// </summary>
[Input(Description = "The name of the subscription to read from.")]
public Input<string>? Subscription { get; set; }

/// <summary>
/// The .NET type to deserialize the message into. Defaults to <see cref="string"/>.
/// </summary>
[Input(Description = "The .NET type to deserialize the message into.")]
public Input<Type> MessageType { get; set; } = new(typeof(string));

/// <summary>
/// The received transport message.
/// </summary>
[Output(Description = "The received transport message.")]
public Output<ReceivedServiceBusMessageModel> TransportMessage { get; set; } = null!;

/// <summary>
/// The received transport message.
/// </summary>
[Output(Description = "The received message.")]
public Output<object> Message { get; set; } = null!;

/// <summary>
/// The formatter to use to parse the message.
/// </summary>
[Input(Description = "The formatter to use to serialize the message.")]
public Input<IFormatter?> Formatter { get; set; } = null!;

/// <inheritdoc />
protected override object GetTriggerPayload(TriggerIndexingContext context) => GetStimulus(context.ExpressionExecutionContext);

/// <inheritdoc />
protected override async ValueTask ExecuteAsync(ActivityExecutionContext context)
{
// If we did not receive external input, it means we are just now encountering this activity.
if (context.IsTriggerOfWorkflow())
{
await Resume(context);
}
else
{
// Create bookmarks for when we receive the expected HTTP request.
context.CreateBookmark(GetStimulus(context.ExpressionExecutionContext), Resume,false);
}
}

private async ValueTask Resume(ActivityExecutionContext context)
{
var receivedMessage = context.GetWorkflowInput<ReceivedServiceBusMessageModel>(InputKey);
await SetResultAsync(receivedMessage, context);
await context.CompleteActivityAsync();
}

private async Task SetResultAsync(ReceivedServiceBusMessageModel receivedMessage, ActivityExecutionContext context)
{
var bodyAsString = new BinaryData(receivedMessage.Body).ToString();
var targetType = context.Get(MessageType);
var formatter = Formatter.GetOrDefault(context);
var body = formatter == null ? bodyAsString : await formatter.FromStringAsync(bodyAsString, targetType, context.CancellationToken);

context.Set(TransportMessage, receivedMessage);
context.Set(Message, body);
}

private object GetStimulus(ExpressionExecutionContext context)
{
var queueOrTopic = context.Get(QueueOrTopic)!;
var subscription = context.Get(Subscription);
return new MessageReceivedStimulus(queueOrTopic, subscription);
}
}
104 changes: 104 additions & 0 deletions src/Elsa.Integrations.AzureServiceBus/Activities/SendMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
using System.Runtime.CompilerServices;
using System.Text.Json;
using Azure.Messaging.ServiceBus;
using Elsa.Common;
using Elsa.Common.Services;
using Elsa.Extensions;
using Elsa.Workflows;
using Elsa.Workflows.Attributes;
using Elsa.Workflows.Models;
using Elsa.Workflows.UIHints;
using JetBrains.Annotations;

namespace Elsa.Integrations.AzureServiceBus.Activities;

/// <summary>
/// Sends a message to a queue or topic in Azure Service Bus.
/// </summary>
[Activity("Elsa.AzureServiceBus.Send", "Azure Service Bus", "Send a message to a queue or topic")]
[PublicAPI]
public class SendMessage : CodeActivity
{
/// <inheritdoc />
public SendMessage([CallerFilePath] string? source = null, [CallerLineNumber] int? line = null) : base(source, line)
{
}

/// <summary>
/// The contents of the message to send.
/// </summary>
[Input(Description = "The contents of the message to send.")]
public Input<object> MessageBody { get; set; } = null!;

/// <summary>
/// The queue or topic to send the message to.
/// </summary>
public Input<string> QueueOrTopic { get; set; } = null!;

/// <summary>
/// The content type of the message.
/// </summary>
public Input<string>? ContentType { get; set; }

/// <summary>
/// The subject of the message.
/// </summary>
public Input<string>? Subject { get; set; }

/// <summary>
/// The correlation ID of the message.
/// </summary>
public Input<string>? CorrelationId { get; set; }

/// <summary>
/// The formatter to use when serializing the message body.
/// </summary>
public Input<Type?> FormatterType { get; set; } = null!;

/// <summary>
/// The application properties to embed with the Service Bus Message
/// </summary>
[Input(Category = "Advanced",
DefaultSyntax = "Json",
SupportedSyntaxes = ["JavaScript", "Json"],
UIHint = InputUIHints.MultiLine)
]
public Input<IDictionary<string, object>?> ApplicationProperties { get; set; } = null!;

/// <inheritdoc />
protected override async ValueTask ExecuteAsync(ActivityExecutionContext context)
{
var queueOrTopic = context.Get(QueueOrTopic);
var messageBody = context.Get(MessageBody);
var cancellationToken = context.CancellationToken;
var serializedMessageBody = await SerializeMessageBodyAsync(context, messageBody!, cancellationToken);

var message = new ServiceBusMessage(serializedMessageBody)
{
ContentType = context.Get(ContentType),
Subject = context.Get(Subject),
CorrelationId = context.Get(CorrelationId)
};

var applicationProperties = ApplicationProperties.GetOrDefault(context);

if (applicationProperties != null)
foreach (var property in applicationProperties)
message.ApplicationProperties.Add(property.Key, ((JsonElement)property.Value).GetString());

var client = context.GetRequiredService<ServiceBusClient>();
await using var sender = client.CreateSender(queueOrTopic);
await sender.SendMessageAsync(message, cancellationToken);
}

private async ValueTask<BinaryData> SerializeMessageBodyAsync(ActivityExecutionContext context, object value, CancellationToken cancellationToken)
{
if (value is string s) return BinaryData.FromString(s);

var formatterType = FormatterType.GetOrDefault(context) ?? typeof(JsonFormatter);
var formatter = context.GetServices<IFormatter>().First(x => x.GetType() == formatterType);
var data = await formatter.ToStringAsync(value, cancellationToken);

return BinaryData.FromString(data);
}
}
14 changes: 14 additions & 0 deletions src/Elsa.Integrations.AzureServiceBus/Contracts/IQueueProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using Elsa.Integrations.AzureServiceBus.Models;

namespace Elsa.Integrations.AzureServiceBus.Contracts;

/// <summary>
/// Provides queue definitions to the system.
/// </summary>
public interface IQueueProvider
{
/// <summary>
/// Returns a list of <see cref="QueueDefinition"/>s.
/// </summary>
ValueTask<ICollection<QueueDefinition>> GetQueuesAsync(CancellationToken cancellationToken);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace Elsa.Integrations.AzureServiceBus.Contracts;

/// <summary>
/// Creates queues, topics and subscriptions provided by <see cref="IQueueProvider"/>, <see cref="ITopicProvider"/> and <see cref="ISubscriptionProvider"/> implementations.
/// </summary>
public interface IServiceBusInitializer
{
/// <summary>
/// Creates queues, topics and subscriptions provided by <see cref="IQueueProvider"/>, <see cref="ITopicProvider"/> and <see cref="ISubscriptionProvider"/> implementations.
/// </summary>
Task InitializeAsync(CancellationToken cancellationToken = default);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using Elsa.Integrations.AzureServiceBus.Models;

namespace Elsa.Integrations.AzureServiceBus.Contracts;

/// <summary>
/// Provides subscription definitions to the system.
/// </summary>
[Obsolete("Use AzureServiceBusOptions.Topics instead.")]

public interface ISubscriptionProvider
{
/// <summary>
/// Return a list of <see cref="SubscriptionDefinition"/>s.
/// </summary>
ValueTask<ICollection<SubscriptionDefinition>> GetSubscriptionsAsync(CancellationToken cancellationToken);
}
14 changes: 14 additions & 0 deletions src/Elsa.Integrations.AzureServiceBus/Contracts/ITopicProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using Elsa.Integrations.AzureServiceBus.Models;

namespace Elsa.Integrations.AzureServiceBus.Contracts;

/// <summary>
/// Provides topic definitions to the system.
/// </summary>
public interface ITopicProvider
{
/// <summary>
/// Returns a list of <see cref="TopicDefinition"/>s.
/// </summary>
ValueTask<ICollection<TopicDefinition>> GetTopicsAsync(CancellationToken cancellationToken);
}
Loading