Skip to content

Commit

Permalink
Upgraded to latest Cleipnir.ResilientFunctions
Browse files Browse the repository at this point in the history
  • Loading branch information
stidsborg committed Nov 15, 2024
1 parent c9ec57c commit 7deb711
Show file tree
Hide file tree
Showing 15 changed files with 52 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Cleipnir.ResilientFunctions\Stores\MySQL\Cleipnir.ResilientFunctions.MySQL\Cleipnir.ResilientFunctions.MySQL.csproj" />
<ProjectReference Include="..\Cleipnir.ResilientFunctions\Stores\MariaDB\Cleipnir.ResilientFunctions.MariaDB\Cleipnir.ResilientFunctions.MariaDB.csproj" />
<ProjectReference Include="..\Cleipnir.ResilientFunctions\Stores\PostgreSQL\Cleipnir.ResilientFunctions.PostgreSQL\Cleipnir.ResilientFunctions.PostgreSQL.csproj" />
<ProjectReference Include="..\Cleipnir.ResilientFunctions\Stores\SqlServer\Cleipnir.ResilientFunctions.SqlServer\Cleipnir.ResilientFunctions.SqlServer.csproj" />

Expand Down
2 changes: 1 addition & 1 deletion Cleipnir.Flows.Tests.AspNet/IntegrationTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public async Task SunshineScenarioPostgres()
[TestMethod]
public async Task SunshineScenarioMySql()
{
var store = await MySqlHelper.CreateAndInitializeMySqlStore();
var store = await MariaDbHelper.CreateAndInitializeMySqlStore();
const string hostUrl = "http://localhost:5003";
var startFlowUrl = $"{hostUrl}/startFlow";
await SunshineScenario(hostUrl, startFlowUrl, store);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
using Cleipnir.ResilientFunctions.MySQL;
using Cleipnir.ResilientFunctions.MariaDb;
using MySqlConnector;

namespace Cleipnir.Flows.Tests.AspNet
{
public static class MySqlHelper
public static class MariaDbHelper
{
public static string ConnectionString { get; }
public static Func<Task<MySqlConnection>> ConnFunc { get; set; }

static MySqlHelper()
static MariaDbHelper()
{
ConnectionString =
Environment.GetEnvironmentVariable("Cleipnir.RFunctions.MySQL.Tests.ConnectionString")
Expand Down Expand Up @@ -40,11 +40,11 @@ public static void CreateDatabase()
}
}

public static async Task<MySqlFunctionStore> CreateAndInitializeMySqlStore()
public static async Task<MariaDbFunctionStore> CreateAndInitializeMySqlStore()
{
CreateDatabase();

var store = new MySqlFunctionStore(ConnectionString, tablePrefix: "MySqlFlows");
var store = new MariaDbFunctionStore(ConnectionString, tablePrefix: "MySqlFlows");
await store.Initialize();
await store.TruncateTables();
return store;
Expand Down
9 changes: 5 additions & 4 deletions Cleipnir.Flows.Tests/Flows/OptionsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,11 @@ await Should.ThrowAsync<InvocationSuspendedException>(
public async Task FlowNameCanBeSpecifiedFromTheOutside()
{
var serviceCollection = new ServiceCollection();

var store = new InMemoryFunctionStore();
var storedType = await store.TypeStore.InsertOrGetStoredType("SomeOtherFlowName");

serviceCollection.AddFlows(c => c
.UseInMemoryStore()
.UseInMemoryStore(store)
.WithOptions(new Options(messagesDefaultMaxWaitForCompletion: TimeSpan.MaxValue))
.RegisterFlow<SimpleFlow, SimpleFlows>(
flowsFactory: sp => new SimpleFlows(
Expand All @@ -70,8 +72,7 @@ public async Task FlowNameCanBeSpecifiedFromTheOutside()
var sp = serviceCollection.BuildServiceProvider();
var flows = sp.GetRequiredService<SimpleFlows>();
await flows.Run("Id");
var store = sp.GetRequiredService<IFunctionStore>();
var sf = await store.GetFunction(new FlowId("SomeOtherFlowName", "Id"));
var sf = await store.GetFunction(new StoredId(storedType, Instance: "Id"));
sf.ShouldNotBeNull();
sf.Status.ShouldBe(Status.Succeeded);
}
Expand Down
16 changes: 8 additions & 8 deletions Cleipnir.Flows.sln
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,9 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cleipnir.ResilientFunctions
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Stores", "Stores", "{9A4E176B-52F2-4375-87DB-AB66F0C37646}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cleipnir.ResilientFunctions.MySQL", "Cleipnir.ResilientFunctions\Stores\MySQL\Cleipnir.ResilientFunctions.MySQL\Cleipnir.ResilientFunctions.MySQL.csproj", "{B6C02093-1375-48B1-A4D7-E6F4A0581506}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cleipnir.ResilientFunctions.SqlServer", "Cleipnir.ResilientFunctions\Stores\SqlServer\Cleipnir.ResilientFunctions.SqlServer\Cleipnir.ResilientFunctions.SqlServer.csproj", "{BC926BA5-072D-4098-9792-712C21C61189}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cleipnir.Flows.MySQL", "Stores\Cleipnir.Flows.MySQL\Cleipnir.Flows.MySQL.csproj", "{4C623459-7BB1-4FF6-8812-85371EA0DE04}"
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cleipnir.Flows.MariaDB", "Stores\Cleipnir.Flows.MariaDB\Cleipnir.Flows.MariaDB.csproj", "{4C623459-7BB1-4FF6-8812-85371EA0DE04}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cleipnir.Flows.PostgresSql", "Stores\Cleipnir.Flows.PostgresSql\Cleipnir.Flows.PostgresSql.csproj", "{EE5A28B6-AD40-4462-8008-8B54F21E82E0}"
EndProject
Expand Down Expand Up @@ -77,6 +75,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cleipnir.Flows.NServiceBus.
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cleipnir.Flows.NServiceBus.Tests", "ServiceBuses\NServiceBus\Cleipnir.Flows.NServiceBus.Tests\Cleipnir.Flows.NServiceBus.Tests.csproj", "{7A7ABDAB-3EFD-4532-B794-64C999B77C72}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cleipnir.ResilientFunctions.MariaDB", "Cleipnir.ResilientFunctions\Stores\MariaDB\Cleipnir.ResilientFunctions.MariaDB\Cleipnir.ResilientFunctions.MariaDB.csproj", "{027072F7-E753-4CF4-8156-C1B502377FF7}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -130,10 +130,6 @@ Global
{785F156E-024C-4D30-83D0-C077B3CCC0CE}.Debug|Any CPU.Build.0 = Debug|Any CPU
{785F156E-024C-4D30-83D0-C077B3CCC0CE}.Release|Any CPU.ActiveCfg = Release|Any CPU
{785F156E-024C-4D30-83D0-C077B3CCC0CE}.Release|Any CPU.Build.0 = Release|Any CPU
{B6C02093-1375-48B1-A4D7-E6F4A0581506}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B6C02093-1375-48B1-A4D7-E6F4A0581506}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B6C02093-1375-48B1-A4D7-E6F4A0581506}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B6C02093-1375-48B1-A4D7-E6F4A0581506}.Release|Any CPU.Build.0 = Release|Any CPU
{BC926BA5-072D-4098-9792-712C21C61189}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{BC926BA5-072D-4098-9792-712C21C61189}.Debug|Any CPU.Build.0 = Debug|Any CPU
{BC926BA5-072D-4098-9792-712C21C61189}.Release|Any CPU.ActiveCfg = Release|Any CPU
Expand Down Expand Up @@ -190,6 +186,10 @@ Global
{7A7ABDAB-3EFD-4532-B794-64C999B77C72}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7A7ABDAB-3EFD-4532-B794-64C999B77C72}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7A7ABDAB-3EFD-4532-B794-64C999B77C72}.Release|Any CPU.Build.0 = Release|Any CPU
{027072F7-E753-4CF4-8156-C1B502377FF7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{027072F7-E753-4CF4-8156-C1B502377FF7}.Debug|Any CPU.Build.0 = Debug|Any CPU
{027072F7-E753-4CF4-8156-C1B502377FF7}.Release|Any CPU.ActiveCfg = Release|Any CPU
{027072F7-E753-4CF4-8156-C1B502377FF7}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{BC470FDE-839E-4B2C-8C56-8B20DEEF2CCD} = {EAD350F1-8EF2-48AC-98EE-A5E739F15787}
Expand All @@ -202,7 +202,6 @@ Global
{19BAF8B6-41E4-45F5-B2E2-97B60A9B6372} = {5E8A4831-5FF4-4BEC-9334-847943DA9517}
{9A4E176B-52F2-4375-87DB-AB66F0C37646} = {7204001F-410C-41DB-9892-696E06D0703D}
{785F156E-024C-4D30-83D0-C077B3CCC0CE} = {9A4E176B-52F2-4375-87DB-AB66F0C37646}
{B6C02093-1375-48B1-A4D7-E6F4A0581506} = {9A4E176B-52F2-4375-87DB-AB66F0C37646}
{BC926BA5-072D-4098-9792-712C21C61189} = {9A4E176B-52F2-4375-87DB-AB66F0C37646}
{4C623459-7BB1-4FF6-8812-85371EA0DE04} = {8D89DB84-E378-40AF-8CA3-16673868D494}
{EE5A28B6-AD40-4462-8008-8B54F21E82E0} = {8D89DB84-E378-40AF-8CA3-16673868D494}
Expand All @@ -221,5 +220,6 @@ Global
{AF7E47FD-B1C2-4B90-A3A3-1D24ED9668E3} = {0872461B-3FC1-4659-AE2D-80109CDA3F5E}
{0280F37A-575B-4F08-9699-D8BECFCA0A3C} = {AF7E47FD-B1C2-4B90-A3A3-1D24ED9668E3}
{7A7ABDAB-3EFD-4532-B794-64C999B77C72} = {AF7E47FD-B1C2-4B90-A3A3-1D24ED9668E3}
{027072F7-E753-4CF4-8156-C1B502377FF7} = {9A4E176B-52F2-4375-87DB-AB66F0C37646}
EndGlobalSection
EndGlobal
4 changes: 2 additions & 2 deletions Cleipnir.Flows/AspNet/FlowsModule.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ public FlowsConfigurator(IServiceCollection services)
Services = services;
}

public FlowsConfigurator UseInMemoryStore()
public FlowsConfigurator UseInMemoryStore(InMemoryFunctionStore? store = null)
{
Services.AddSingleton<IFunctionStore>(new InMemoryFunctionStore());
Services.AddSingleton<IFunctionStore>(store ?? new InMemoryFunctionStore());
return this;
}

Expand Down
17 changes: 9 additions & 8 deletions Cleipnir.Flows/Flows.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using Cleipnir.ResilientFunctions.Domain;
using Cleipnir.ResilientFunctions.Helpers;
using Cleipnir.ResilientFunctions.Messaging;
using Cleipnir.ResilientFunctions.Storage;
using Microsoft.Extensions.DependencyInjection;

namespace Cleipnir.Flows;
Expand All @@ -18,7 +19,7 @@ public interface IBaseFlows
public static abstract Type FlowType { get; }

public Task RouteMessage<T>(T message, string correlationId, string? idempotencyKey = null) where T : notnull;
public Task<IReadOnlyList<FlowInstance>> GetInstances(Status? status = null);
public Task<IReadOnlyList<StoredInstance>> GetInstances(Status? status = null);
}

public abstract class BaseFlows<TFlow> : IBaseFlows where TFlow : notnull
Expand All @@ -29,7 +30,7 @@ public abstract class BaseFlows<TFlow> : IBaseFlows where TFlow : notnull

protected BaseFlows(FlowsContainer flowsContainer) => FlowsContainer = flowsContainer;

public abstract Task<IReadOnlyList<FlowInstance>> GetInstances(Status? status = null);
public abstract Task<IReadOnlyList<StoredInstance>> GetInstances(Status? status = null);

private static Action<TFlow, Workflow> CreateWorkflowSetter()
{
Expand Down Expand Up @@ -103,7 +104,7 @@ public Flows(string flowName, FlowsContainer flowsContainer, Options? options =
=> _registration.GetState<TState>(functionInstanceId);

public MessageWriter MessageWriter(string instanceId)
=> _registration.MessageWriters.For(instanceId);
=> _registration.MessageWriters.For(instanceId.ToFlowInstance());

public Task Run(string instanceId)
=> _registration.Invoke(instanceId);
Expand All @@ -119,7 +120,7 @@ public override Task RouteMessage<T>(T message, string correlationId, string? id

public Task BulkSchedule(IEnumerable<FlowInstance> instanceIds) => _registration.BulkSchedule(instanceIds);

public override Task<IReadOnlyList<FlowInstance>> GetInstances(Status? status = null)
public override Task<IReadOnlyList<StoredInstance>> GetInstances(Status? status = null)
=> _registration.GetInstances(status);

public Task<Finding> SendMessage<T>(FlowInstance flowInstance, T message, bool create = true, string? idempotencyKey = null) where T : notnull
Expand Down Expand Up @@ -159,7 +160,7 @@ public Flows(string flowName, FlowsContainer flowsContainer, Options? options =
=> _registration.GetState<TState>(functionInstanceId);

public MessageWriter MessageWriter(string instanceId)
=> _registration.MessageWriters.For(instanceId);
=> _registration.MessageWriters.For(instanceId.ToFlowInstance());

public Task Run(string instanceId, TParam param)
=> _registration.Invoke(instanceId, param);
Expand All @@ -181,7 +182,7 @@ TimeSpan delay
public override Task RouteMessage<T>(T message, string correlationId, string? idempotencyKey = null)
=> _registration.RouteMessage(message, correlationId, idempotencyKey);

public override Task<IReadOnlyList<FlowInstance>> GetInstances(Status? status = null)
public override Task<IReadOnlyList<StoredInstance>> GetInstances(Status? status = null)
=> _registration.GetInstances(status);

public Task<Finding> SendMessage<T>(FlowInstance flowInstance, T message, string? idempotencyKey = null) where T : notnull
Expand Down Expand Up @@ -214,7 +215,7 @@ public Flows(string flowName, FlowsContainer flowsContainer, Options? options =
=> _registration.ControlPanel(instanceId);

public MessageWriter MessageWriter(string instanceId)
=> _registration.MessageWriters.For(instanceId);
=> _registration.MessageWriters.For(instanceId.ToFlowInstance());

public Task<TResult> Run(string instanceId, TParam param)
=> _registration.Invoke(instanceId, param);
Expand All @@ -239,7 +240,7 @@ TimeSpan delay
public override Task RouteMessage<T>(T message, string correlationId, string? idempotencyKey = null)
=> _registration.RouteMessage(message, correlationId, idempotencyKey);

public override Task<IReadOnlyList<FlowInstance>> GetInstances(Status? status = null)
public override Task<IReadOnlyList<StoredInstance>> GetInstances(Status? status = null)
=> _registration.GetInstances(status);

public Task<Finding> SendMessage<T>(FlowInstance flowInstance, T message, string? idempotencyKey = null) where T : notnull
Expand Down
2 changes: 1 addition & 1 deletion Cleipnir.ResilientFunctions
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ It works for both RPC and Message-based communication.
## Getting Started
To get started simply perform the following three steps in an ASP.NET service:

Firstly, install the Cleipnir.Flows nuget package (using either Postgres, SqlServer or MySQL as persistence layer). I.e.
Firstly, install the Cleipnir.Flows nuget package (using either Postgres, SqlServer or MariaDB as persistence layer). I.e.
```powershell
Install-Package Cleipnir.Flows.Postgres
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@ public static async Task Main(string[] args)
var host = await CreateHostBuilder([]).StartAsync();
var bus = host.Services.GetRequiredService<IBus>();
var store = host.Services.GetRequiredService<IFunctionStore>();

var simpleFlowStoredType = await store.TypeStore.InsertOrGetStoredType(nameof(SimpleFlow));

var testSize = 1_000;
for (var i = 0; i < testSize; i++)
await bus.Publish(new MyMessage(i.ToString()));

while (true)
{
var succeeded = await store.GetSucceededFunctions(
nameof(SimpleFlow),
simpleFlowStoredType,
DateTime.UtcNow.Ticks + 1_000_000
).SelectAsync(f => f.Count);
if (succeeded == testSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,16 @@ public static async Task Main(string[] args)
var bus = host.Services.GetRequiredService<IMessageSession>();
var store = host.Services.GetRequiredService<IFunctionStore>();

var simpleFlowStoredType = await store.TypeStore.InsertOrGetStoredType(nameof(SimpleFlow));

var testSize = 1_000;
for (var i = 0; i < testSize; i++)
await bus.Publish(new MyMessage(i.ToString()));

while (true)
{
var succeeded = await store.GetSucceededFunctions(
nameof(SimpleFlow),
simpleFlowStoredType,
DateTime.UtcNow.Ticks + 1_000_000
).SelectAsync(f => f.Count);
if (succeeded == testSize)
Expand Down
4 changes: 3 additions & 1 deletion ServiceBuses/Rebus/Cleipnir.Flows.Rebus.Console/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ public static async Task Main(string[] args)
var bus = host.Services.GetRequiredService<IBus>();
var store = host.Services.GetRequiredService<IFunctionStore>();

var simpleFlowStoredType = await store.TypeStore.InsertOrGetStoredType(nameof(SimpleFlow));

for (var i = 0; i < 1_000; i++)
await bus.SendLocal(new MyMessage(i.ToString()));

while (true)
{
var succeeded = await store.GetSucceededFunctions(
nameof(SimpleFlow),
simpleFlowStoredType,
DateTime.UtcNow.Ticks + 1_000_000
).SelectAsync(f => f.Count);
if (succeeded == 1_000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ public static async Task Main(string[] args)
var host = await CreateHostBuilder([]).StartAsync();
var bus = host.Services.GetRequiredService<IMessageBus>();
var store = host.Services.GetRequiredService<IFunctionStore>();

var simpleFlowStoredType = await store.TypeStore.InsertOrGetStoredType(nameof(SimpleFlow));

var testSize = 1_000;
for (var i = 0; i < testSize; i++)
Expand All @@ -22,7 +24,7 @@ public static async Task Main(string[] args)
while (true)
{
var succeeded = await store.GetSucceededFunctions(
nameof(SimpleFlow),
simpleFlowStoredType,
DateTime.UtcNow.Ticks + 1_000_000
).SelectAsync(f => f.Count);
if (succeeded == testSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

<ItemGroup>
<ProjectReference Include="../../Cleipnir.Flows/Cleipnir.Flows.csproj" />
<ProjectReference Include="../../Cleipnir.ResilientFunctions/Stores/MySQL/Cleipnir.ResilientFunctions.MySQL/Cleipnir.ResilientFunctions.MySQL.csproj" />
<ProjectReference Include="../../Cleipnir.ResilientFunctions/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/Cleipnir.ResilientFunctions.MariaDB.csproj" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
using System;
using Cleipnir.Flows.AspNet;
using Cleipnir.ResilientFunctions.MySQL;
using Cleipnir.ResilientFunctions.MariaDb;
using Cleipnir.ResilientFunctions.Storage;
using Microsoft.Extensions.DependencyInjection;

namespace Cleipnir.Flows.MySQL;
namespace Cleipnir.Flows.MariaDB;

public static class FlowsModule
{
public static FlowsConfigurator UseMySqlStore(
public static FlowsConfigurator UseMariaSqlStore(
this FlowsConfigurator configurator,
Func<IServiceProvider, string> connectionStringFunc,
bool initializeDatabase = true,
Expand All @@ -19,7 +19,7 @@ public static FlowsConfigurator UseMySqlStore(
sp =>
{
var connectionString = connectionStringFunc(sp);
var store = new MySqlFunctionStore(connectionString, tablePrefix);
var store = new MariaDbFunctionStore(connectionString, tablePrefix);
if (initializeDatabase)
store.Initialize().GetAwaiter().GetResult();

Expand All @@ -29,10 +29,10 @@ public static FlowsConfigurator UseMySqlStore(
return configurator;
}

public static FlowsConfigurator UseMySqlStore(
public static FlowsConfigurator UseMariaSqlStore(
this FlowsConfigurator configurator,
string connectionString,
bool initializeDatabase = true,
string tablePrefix = "flows"
) => UseMySqlStore(configurator, _ => connectionString, initializeDatabase, tablePrefix);
) => UseMariaSqlStore(configurator, _ => connectionString, initializeDatabase, tablePrefix);
}

0 comments on commit 7deb711

Please sign in to comment.