diff --git a/Cleipnir.Flows.Tests.AspNet/Cleipnir.Flows.Tests.AspNet.csproj b/Cleipnir.Flows.Tests.AspNet/Cleipnir.Flows.Tests.AspNet.csproj index e65949f..5be85d1 100644 --- a/Cleipnir.Flows.Tests.AspNet/Cleipnir.Flows.Tests.AspNet.csproj +++ b/Cleipnir.Flows.Tests.AspNet/Cleipnir.Flows.Tests.AspNet.csproj @@ -20,7 +20,7 @@ - + diff --git a/Cleipnir.Flows.Tests.AspNet/IntegrationTest.cs b/Cleipnir.Flows.Tests.AspNet/IntegrationTest.cs index 25cd0d7..d2161f5 100644 --- a/Cleipnir.Flows.Tests.AspNet/IntegrationTest.cs +++ b/Cleipnir.Flows.Tests.AspNet/IntegrationTest.cs @@ -40,7 +40,7 @@ public async Task SunshineScenarioPostgres() [TestMethod] public async Task SunshineScenarioMySql() { - var store = await MySqlHelper.CreateAndInitializeMySqlStore(); + var store = await MariaDbHelper.CreateAndInitializeMySqlStore(); const string hostUrl = "http://localhost:5003"; var startFlowUrl = $"{hostUrl}/startFlow"; await SunshineScenario(hostUrl, startFlowUrl, store); diff --git a/Cleipnir.Flows.Tests.AspNet/MySqlHelper.cs b/Cleipnir.Flows.Tests.AspNet/MariaDbHelper.cs similarity index 85% rename from Cleipnir.Flows.Tests.AspNet/MySqlHelper.cs rename to Cleipnir.Flows.Tests.AspNet/MariaDbHelper.cs index 27129bf..0904c2a 100644 --- a/Cleipnir.Flows.Tests.AspNet/MySqlHelper.cs +++ b/Cleipnir.Flows.Tests.AspNet/MariaDbHelper.cs @@ -1,14 +1,14 @@ -using Cleipnir.ResilientFunctions.MySQL; +using Cleipnir.ResilientFunctions.MariaDb; using MySqlConnector; namespace Cleipnir.Flows.Tests.AspNet { - public static class MySqlHelper + public static class MariaDbHelper { public static string ConnectionString { get; } public static Func> ConnFunc { get; set; } - static MySqlHelper() + static MariaDbHelper() { ConnectionString = Environment.GetEnvironmentVariable("Cleipnir.RFunctions.MySQL.Tests.ConnectionString") @@ -40,11 +40,11 @@ public static void CreateDatabase() } } - public static async Task CreateAndInitializeMySqlStore() + public static async Task CreateAndInitializeMySqlStore() { CreateDatabase(); - var store = new MySqlFunctionStore(ConnectionString, tablePrefix: "MySqlFlows"); + var store = new MariaDbFunctionStore(ConnectionString, tablePrefix: "MySqlFlows"); await store.Initialize(); await store.TruncateTables(); return store; diff --git a/Cleipnir.Flows.Tests/Flows/OptionsTests.cs b/Cleipnir.Flows.Tests/Flows/OptionsTests.cs index ce8206c..332681a 100644 --- a/Cleipnir.Flows.Tests/Flows/OptionsTests.cs +++ b/Cleipnir.Flows.Tests/Flows/OptionsTests.cs @@ -55,9 +55,11 @@ await Should.ThrowAsync( public async Task FlowNameCanBeSpecifiedFromTheOutside() { var serviceCollection = new ServiceCollection(); - + var store = new InMemoryFunctionStore(); + var storedType = await store.TypeStore.InsertOrGetStoredType("SomeOtherFlowName"); + serviceCollection.AddFlows(c => c - .UseInMemoryStore() + .UseInMemoryStore(store) .WithOptions(new Options(messagesDefaultMaxWaitForCompletion: TimeSpan.MaxValue)) .RegisterFlow( flowsFactory: sp => new SimpleFlows( @@ -70,8 +72,7 @@ public async Task FlowNameCanBeSpecifiedFromTheOutside() var sp = serviceCollection.BuildServiceProvider(); var flows = sp.GetRequiredService(); await flows.Run("Id"); - var store = sp.GetRequiredService(); - var sf = await store.GetFunction(new FlowId("SomeOtherFlowName", "Id")); + var sf = await store.GetFunction(new StoredId(storedType, Instance: "Id")); sf.ShouldNotBeNull(); sf.Status.ShouldBe(Status.Succeeded); } diff --git a/Cleipnir.Flows.sln b/Cleipnir.Flows.sln index a0c0417..74a41eb 100644 --- a/Cleipnir.Flows.sln +++ b/Cleipnir.Flows.sln @@ -37,11 +37,9 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cleipnir.ResilientFunctions EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Stores", "Stores", "{9A4E176B-52F2-4375-87DB-AB66F0C37646}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cleipnir.ResilientFunctions.MySQL", "Cleipnir.ResilientFunctions\Stores\MySQL\Cleipnir.ResilientFunctions.MySQL\Cleipnir.ResilientFunctions.MySQL.csproj", "{B6C02093-1375-48B1-A4D7-E6F4A0581506}" -EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cleipnir.ResilientFunctions.SqlServer", "Cleipnir.ResilientFunctions\Stores\SqlServer\Cleipnir.ResilientFunctions.SqlServer\Cleipnir.ResilientFunctions.SqlServer.csproj", "{BC926BA5-072D-4098-9792-712C21C61189}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cleipnir.Flows.MySQL", "Stores\Cleipnir.Flows.MySQL\Cleipnir.Flows.MySQL.csproj", "{4C623459-7BB1-4FF6-8812-85371EA0DE04}" +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cleipnir.Flows.MariaDB", "Stores\Cleipnir.Flows.MariaDB\Cleipnir.Flows.MariaDB.csproj", "{4C623459-7BB1-4FF6-8812-85371EA0DE04}" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cleipnir.Flows.PostgresSql", "Stores\Cleipnir.Flows.PostgresSql\Cleipnir.Flows.PostgresSql.csproj", "{EE5A28B6-AD40-4462-8008-8B54F21E82E0}" EndProject @@ -77,6 +75,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cleipnir.Flows.NServiceBus. EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cleipnir.Flows.NServiceBus.Tests", "ServiceBuses\NServiceBus\Cleipnir.Flows.NServiceBus.Tests\Cleipnir.Flows.NServiceBus.Tests.csproj", "{7A7ABDAB-3EFD-4532-B794-64C999B77C72}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cleipnir.ResilientFunctions.MariaDB", "Cleipnir.ResilientFunctions\Stores\MariaDB\Cleipnir.ResilientFunctions.MariaDB\Cleipnir.ResilientFunctions.MariaDB.csproj", "{027072F7-E753-4CF4-8156-C1B502377FF7}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -130,10 +130,6 @@ Global {785F156E-024C-4D30-83D0-C077B3CCC0CE}.Debug|Any CPU.Build.0 = Debug|Any CPU {785F156E-024C-4D30-83D0-C077B3CCC0CE}.Release|Any CPU.ActiveCfg = Release|Any CPU {785F156E-024C-4D30-83D0-C077B3CCC0CE}.Release|Any CPU.Build.0 = Release|Any CPU - {B6C02093-1375-48B1-A4D7-E6F4A0581506}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {B6C02093-1375-48B1-A4D7-E6F4A0581506}.Debug|Any CPU.Build.0 = Debug|Any CPU - {B6C02093-1375-48B1-A4D7-E6F4A0581506}.Release|Any CPU.ActiveCfg = Release|Any CPU - {B6C02093-1375-48B1-A4D7-E6F4A0581506}.Release|Any CPU.Build.0 = Release|Any CPU {BC926BA5-072D-4098-9792-712C21C61189}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {BC926BA5-072D-4098-9792-712C21C61189}.Debug|Any CPU.Build.0 = Debug|Any CPU {BC926BA5-072D-4098-9792-712C21C61189}.Release|Any CPU.ActiveCfg = Release|Any CPU @@ -190,6 +186,10 @@ Global {7A7ABDAB-3EFD-4532-B794-64C999B77C72}.Debug|Any CPU.Build.0 = Debug|Any CPU {7A7ABDAB-3EFD-4532-B794-64C999B77C72}.Release|Any CPU.ActiveCfg = Release|Any CPU {7A7ABDAB-3EFD-4532-B794-64C999B77C72}.Release|Any CPU.Build.0 = Release|Any CPU + {027072F7-E753-4CF4-8156-C1B502377FF7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {027072F7-E753-4CF4-8156-C1B502377FF7}.Debug|Any CPU.Build.0 = Debug|Any CPU + {027072F7-E753-4CF4-8156-C1B502377FF7}.Release|Any CPU.ActiveCfg = Release|Any CPU + {027072F7-E753-4CF4-8156-C1B502377FF7}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(NestedProjects) = preSolution {BC470FDE-839E-4B2C-8C56-8B20DEEF2CCD} = {EAD350F1-8EF2-48AC-98EE-A5E739F15787} @@ -202,7 +202,6 @@ Global {19BAF8B6-41E4-45F5-B2E2-97B60A9B6372} = {5E8A4831-5FF4-4BEC-9334-847943DA9517} {9A4E176B-52F2-4375-87DB-AB66F0C37646} = {7204001F-410C-41DB-9892-696E06D0703D} {785F156E-024C-4D30-83D0-C077B3CCC0CE} = {9A4E176B-52F2-4375-87DB-AB66F0C37646} - {B6C02093-1375-48B1-A4D7-E6F4A0581506} = {9A4E176B-52F2-4375-87DB-AB66F0C37646} {BC926BA5-072D-4098-9792-712C21C61189} = {9A4E176B-52F2-4375-87DB-AB66F0C37646} {4C623459-7BB1-4FF6-8812-85371EA0DE04} = {8D89DB84-E378-40AF-8CA3-16673868D494} {EE5A28B6-AD40-4462-8008-8B54F21E82E0} = {8D89DB84-E378-40AF-8CA3-16673868D494} @@ -221,5 +220,6 @@ Global {AF7E47FD-B1C2-4B90-A3A3-1D24ED9668E3} = {0872461B-3FC1-4659-AE2D-80109CDA3F5E} {0280F37A-575B-4F08-9699-D8BECFCA0A3C} = {AF7E47FD-B1C2-4B90-A3A3-1D24ED9668E3} {7A7ABDAB-3EFD-4532-B794-64C999B77C72} = {AF7E47FD-B1C2-4B90-A3A3-1D24ED9668E3} + {027072F7-E753-4CF4-8156-C1B502377FF7} = {9A4E176B-52F2-4375-87DB-AB66F0C37646} EndGlobalSection EndGlobal diff --git a/Cleipnir.Flows/AspNet/FlowsModule.cs b/Cleipnir.Flows/AspNet/FlowsModule.cs index d5c71be..a4beba4 100644 --- a/Cleipnir.Flows/AspNet/FlowsModule.cs +++ b/Cleipnir.Flows/AspNet/FlowsModule.cs @@ -42,9 +42,9 @@ public FlowsConfigurator(IServiceCollection services) Services = services; } - public FlowsConfigurator UseInMemoryStore() + public FlowsConfigurator UseInMemoryStore(InMemoryFunctionStore? store = null) { - Services.AddSingleton(new InMemoryFunctionStore()); + Services.AddSingleton(store ?? new InMemoryFunctionStore()); return this; } diff --git a/Cleipnir.Flows/Flows.cs b/Cleipnir.Flows/Flows.cs index b426bd9..42bf2ab 100644 --- a/Cleipnir.Flows/Flows.cs +++ b/Cleipnir.Flows/Flows.cs @@ -9,6 +9,7 @@ using Cleipnir.ResilientFunctions.Domain; using Cleipnir.ResilientFunctions.Helpers; using Cleipnir.ResilientFunctions.Messaging; +using Cleipnir.ResilientFunctions.Storage; using Microsoft.Extensions.DependencyInjection; namespace Cleipnir.Flows; @@ -18,7 +19,7 @@ public interface IBaseFlows public static abstract Type FlowType { get; } public Task RouteMessage(T message, string correlationId, string? idempotencyKey = null) where T : notnull; - public Task> GetInstances(Status? status = null); + public Task> GetInstances(Status? status = null); } public abstract class BaseFlows : IBaseFlows where TFlow : notnull @@ -29,7 +30,7 @@ public abstract class BaseFlows : IBaseFlows where TFlow : notnull protected BaseFlows(FlowsContainer flowsContainer) => FlowsContainer = flowsContainer; - public abstract Task> GetInstances(Status? status = null); + public abstract Task> GetInstances(Status? status = null); private static Action CreateWorkflowSetter() { @@ -103,7 +104,7 @@ public Flows(string flowName, FlowsContainer flowsContainer, Options? options = => _registration.GetState(functionInstanceId); public MessageWriter MessageWriter(string instanceId) - => _registration.MessageWriters.For(instanceId); + => _registration.MessageWriters.For(instanceId.ToFlowInstance()); public Task Run(string instanceId) => _registration.Invoke(instanceId); @@ -119,7 +120,7 @@ public override Task RouteMessage(T message, string correlationId, string? id public Task BulkSchedule(IEnumerable instanceIds) => _registration.BulkSchedule(instanceIds); - public override Task> GetInstances(Status? status = null) + public override Task> GetInstances(Status? status = null) => _registration.GetInstances(status); public Task SendMessage(FlowInstance flowInstance, T message, bool create = true, string? idempotencyKey = null) where T : notnull @@ -159,7 +160,7 @@ public Flows(string flowName, FlowsContainer flowsContainer, Options? options = => _registration.GetState(functionInstanceId); public MessageWriter MessageWriter(string instanceId) - => _registration.MessageWriters.For(instanceId); + => _registration.MessageWriters.For(instanceId.ToFlowInstance()); public Task Run(string instanceId, TParam param) => _registration.Invoke(instanceId, param); @@ -181,7 +182,7 @@ TimeSpan delay public override Task RouteMessage(T message, string correlationId, string? idempotencyKey = null) => _registration.RouteMessage(message, correlationId, idempotencyKey); - public override Task> GetInstances(Status? status = null) + public override Task> GetInstances(Status? status = null) => _registration.GetInstances(status); public Task SendMessage(FlowInstance flowInstance, T message, string? idempotencyKey = null) where T : notnull @@ -214,7 +215,7 @@ public Flows(string flowName, FlowsContainer flowsContainer, Options? options = => _registration.ControlPanel(instanceId); public MessageWriter MessageWriter(string instanceId) - => _registration.MessageWriters.For(instanceId); + => _registration.MessageWriters.For(instanceId.ToFlowInstance()); public Task Run(string instanceId, TParam param) => _registration.Invoke(instanceId, param); @@ -239,7 +240,7 @@ TimeSpan delay public override Task RouteMessage(T message, string correlationId, string? idempotencyKey = null) => _registration.RouteMessage(message, correlationId, idempotencyKey); - public override Task> GetInstances(Status? status = null) + public override Task> GetInstances(Status? status = null) => _registration.GetInstances(status); public Task SendMessage(FlowInstance flowInstance, T message, string? idempotencyKey = null) where T : notnull diff --git a/Cleipnir.ResilientFunctions b/Cleipnir.ResilientFunctions index 1a43766..a28e358 160000 --- a/Cleipnir.ResilientFunctions +++ b/Cleipnir.ResilientFunctions @@ -1 +1 @@ -Subproject commit 1a4376645f0c01cea179c6dc86f0ca407f2d808b +Subproject commit a28e358856708bd721af39307475a96b6b0298cd diff --git a/README.md b/README.md index 94d8347..2905f9d 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ It works for both RPC and Message-based communication. ## Getting Started To get started simply perform the following three steps in an ASP.NET service: -Firstly, install the Cleipnir.Flows nuget package (using either Postgres, SqlServer or MySQL as persistence layer). I.e. +Firstly, install the Cleipnir.Flows nuget package (using either Postgres, SqlServer or MariaDB as persistence layer). I.e. ```powershell Install-Package Cleipnir.Flows.Postgres ``` diff --git a/ServiceBuses/MassTransit/Cleipnir.Flows.MassTransit.Console/Program.cs b/ServiceBuses/MassTransit/Cleipnir.Flows.MassTransit.Console/Program.cs index 2d4b5f6..0533667 100644 --- a/ServiceBuses/MassTransit/Cleipnir.Flows.MassTransit.Console/Program.cs +++ b/ServiceBuses/MassTransit/Cleipnir.Flows.MassTransit.Console/Program.cs @@ -14,7 +14,8 @@ public static async Task Main(string[] args) var host = await CreateHostBuilder([]).StartAsync(); var bus = host.Services.GetRequiredService(); var store = host.Services.GetRequiredService(); - + var simpleFlowStoredType = await store.TypeStore.InsertOrGetStoredType(nameof(SimpleFlow)); + var testSize = 1_000; for (var i = 0; i < testSize; i++) await bus.Publish(new MyMessage(i.ToString())); @@ -22,7 +23,7 @@ public static async Task Main(string[] args) while (true) { var succeeded = await store.GetSucceededFunctions( - nameof(SimpleFlow), + simpleFlowStoredType, DateTime.UtcNow.Ticks + 1_000_000 ).SelectAsync(f => f.Count); if (succeeded == testSize) diff --git a/ServiceBuses/NServiceBus/Cleipnir.Flows.NServiceBus.Console/Program.cs b/ServiceBuses/NServiceBus/Cleipnir.Flows.NServiceBus.Console/Program.cs index b26fb0b..baf6a31 100644 --- a/ServiceBuses/NServiceBus/Cleipnir.Flows.NServiceBus.Console/Program.cs +++ b/ServiceBuses/NServiceBus/Cleipnir.Flows.NServiceBus.Console/Program.cs @@ -14,6 +14,8 @@ public static async Task Main(string[] args) var bus = host.Services.GetRequiredService(); var store = host.Services.GetRequiredService(); + var simpleFlowStoredType = await store.TypeStore.InsertOrGetStoredType(nameof(SimpleFlow)); + var testSize = 1_000; for (var i = 0; i < testSize; i++) await bus.Publish(new MyMessage(i.ToString())); @@ -21,7 +23,7 @@ public static async Task Main(string[] args) while (true) { var succeeded = await store.GetSucceededFunctions( - nameof(SimpleFlow), + simpleFlowStoredType, DateTime.UtcNow.Ticks + 1_000_000 ).SelectAsync(f => f.Count); if (succeeded == testSize) diff --git a/ServiceBuses/Rebus/Cleipnir.Flows.Rebus.Console/Program.cs b/ServiceBuses/Rebus/Cleipnir.Flows.Rebus.Console/Program.cs index 7cd465e..27cf6f0 100644 --- a/ServiceBuses/Rebus/Cleipnir.Flows.Rebus.Console/Program.cs +++ b/ServiceBuses/Rebus/Cleipnir.Flows.Rebus.Console/Program.cs @@ -17,13 +17,15 @@ public static async Task Main(string[] args) var bus = host.Services.GetRequiredService(); var store = host.Services.GetRequiredService(); + var simpleFlowStoredType = await store.TypeStore.InsertOrGetStoredType(nameof(SimpleFlow)); + for (var i = 0; i < 1_000; i++) await bus.SendLocal(new MyMessage(i.ToString())); while (true) { var succeeded = await store.GetSucceededFunctions( - nameof(SimpleFlow), + simpleFlowStoredType, DateTime.UtcNow.Ticks + 1_000_000 ).SelectAsync(f => f.Count); if (succeeded == 1_000) diff --git a/ServiceBuses/Wolverine/Cleipnir.Flows.Wolverine.Console/Program.cs b/ServiceBuses/Wolverine/Cleipnir.Flows.Wolverine.Console/Program.cs index ab29423..fa95cc8 100644 --- a/ServiceBuses/Wolverine/Cleipnir.Flows.Wolverine.Console/Program.cs +++ b/ServiceBuses/Wolverine/Cleipnir.Flows.Wolverine.Console/Program.cs @@ -14,6 +14,8 @@ public static async Task Main(string[] args) var host = await CreateHostBuilder([]).StartAsync(); var bus = host.Services.GetRequiredService(); var store = host.Services.GetRequiredService(); + + var simpleFlowStoredType = await store.TypeStore.InsertOrGetStoredType(nameof(SimpleFlow)); var testSize = 1_000; for (var i = 0; i < testSize; i++) @@ -22,7 +24,7 @@ public static async Task Main(string[] args) while (true) { var succeeded = await store.GetSucceededFunctions( - nameof(SimpleFlow), + simpleFlowStoredType, DateTime.UtcNow.Ticks + 1_000_000 ).SelectAsync(f => f.Count); if (succeeded == testSize) diff --git a/Stores/Cleipnir.Flows.MySQL/Cleipnir.Flows.MySQL.csproj b/Stores/Cleipnir.Flows.MariaDB/Cleipnir.Flows.MariaDB.csproj similarity index 95% rename from Stores/Cleipnir.Flows.MySQL/Cleipnir.Flows.MySQL.csproj rename to Stores/Cleipnir.Flows.MariaDB/Cleipnir.Flows.MariaDB.csproj index faf48fa..59a3b39 100644 --- a/Stores/Cleipnir.Flows.MySQL/Cleipnir.Flows.MySQL.csproj +++ b/Stores/Cleipnir.Flows.MariaDB/Cleipnir.Flows.MariaDB.csproj @@ -24,7 +24,7 @@ - + diff --git a/Stores/Cleipnir.Flows.MySQL/FlowsModule.cs b/Stores/Cleipnir.Flows.MariaDB/FlowsModule.cs similarity index 70% rename from Stores/Cleipnir.Flows.MySQL/FlowsModule.cs rename to Stores/Cleipnir.Flows.MariaDB/FlowsModule.cs index f8c7245..981c661 100644 --- a/Stores/Cleipnir.Flows.MySQL/FlowsModule.cs +++ b/Stores/Cleipnir.Flows.MariaDB/FlowsModule.cs @@ -1,14 +1,14 @@ using System; using Cleipnir.Flows.AspNet; -using Cleipnir.ResilientFunctions.MySQL; +using Cleipnir.ResilientFunctions.MariaDb; using Cleipnir.ResilientFunctions.Storage; using Microsoft.Extensions.DependencyInjection; -namespace Cleipnir.Flows.MySQL; +namespace Cleipnir.Flows.MariaDB; public static class FlowsModule { - public static FlowsConfigurator UseMySqlStore( + public static FlowsConfigurator UseMariaSqlStore( this FlowsConfigurator configurator, Func connectionStringFunc, bool initializeDatabase = true, @@ -19,7 +19,7 @@ public static FlowsConfigurator UseMySqlStore( sp => { var connectionString = connectionStringFunc(sp); - var store = new MySqlFunctionStore(connectionString, tablePrefix); + var store = new MariaDbFunctionStore(connectionString, tablePrefix); if (initializeDatabase) store.Initialize().GetAwaiter().GetResult(); @@ -29,10 +29,10 @@ public static FlowsConfigurator UseMySqlStore( return configurator; } - public static FlowsConfigurator UseMySqlStore( + public static FlowsConfigurator UseMariaSqlStore( this FlowsConfigurator configurator, string connectionString, bool initializeDatabase = true, string tablePrefix = "flows" - ) => UseMySqlStore(configurator, _ => connectionString, initializeDatabase, tablePrefix); + ) => UseMariaSqlStore(configurator, _ => connectionString, initializeDatabase, tablePrefix); } \ No newline at end of file