From fcfbf0bbe8a2960dae24e3fe8a34e1b48010f6ba Mon Sep 17 00:00:00 2001 From: stidsborg Date: Sat, 5 Oct 2024 10:40:42 +0200 Subject: [PATCH] Implemented distributed newsletter flow sample --- .../C_NewsletterSender/Distributed/Example.cs | 56 +++++++++++++++++++ .../Distributed/NewsletterChildFlow.cs | 40 +++++++++++++ .../Distributed/NewsletterParentFlow.cs | 37 ++++++++++++ .../C_NewsletterSender/Example.cs | 31 +++++----- .../C_NewsletterSender/NewsletterFlow.cs | 2 +- .../Program.cs | 2 +- 6 files changed, 153 insertions(+), 15 deletions(-) create mode 100644 Samples/Cleipnir.Flows.Sample.Presentation/C_NewsletterSender/Distributed/Example.cs create mode 100644 Samples/Cleipnir.Flows.Sample.Presentation/C_NewsletterSender/Distributed/NewsletterChildFlow.cs create mode 100644 Samples/Cleipnir.Flows.Sample.Presentation/C_NewsletterSender/Distributed/NewsletterParentFlow.cs diff --git a/Samples/Cleipnir.Flows.Sample.Presentation/C_NewsletterSender/Distributed/Example.cs b/Samples/Cleipnir.Flows.Sample.Presentation/C_NewsletterSender/Distributed/Example.cs new file mode 100644 index 0000000..c1859e8 --- /dev/null +++ b/Samples/Cleipnir.Flows.Sample.Presentation/C_NewsletterSender/Distributed/Example.cs @@ -0,0 +1,56 @@ +using Cleipnir.Flows.AspNet; +using Cleipnir.ResilientFunctions.PostgreSQL; + +namespace Cleipnir.Flows.Sample.Presentation.C_NewsletterSender.Distributed; + +public static class Example +{ + public static async Task Perform() + { + var connStr = "Server=localhost;Database=flows;User Id=postgres;Password=Pa55word!; Include Error Detail=true;"; + await DatabaseHelper.CreateDatabaseIfNotExists(connStr); + var store = new PostgreSqlFunctionStore(connStr); + await store.Initialize(); + await store.TruncateTables(); + + var replicas = Enumerable + .Range(0, 3) + .Select(child => + { + var serviceCollection = new ServiceCollection(); + serviceCollection.AddFlows( + c => c + .UseStore(store) + .WithOptions(new Options(unhandledExceptionHandler: Console.WriteLine)) + .RegisterFlow( + flowsFactory: sp => + new NewsletterChildFlows( + sp.GetRequiredService(), + options: new Options(maxParallelRetryInvocations: 1) + ), + flowFactory: sp => new NewsletterChildFlow(sp.GetRequiredService(), child) + ) + .RegisterFlowsAutomatically() + ); + + var sp = serviceCollection.BuildServiceProvider(); + var __ = sp.GetRequiredService(); + + return sp.GetRequiredService(); + }) + .ToList(); + + await replicas[0].Run( + instanceId: "2023-10", + new MailAndRecipients( + [ + new("Peter Hansen", "peter@gmail.com"), + new("Ulla Hansen", "ulla@gmail.com"), + new("Heino Hansen", "heino@gmail.com") + ], + Subject: "To message queue or not?", + Content: "Message Queues are omnipresent but do we really need them in our enterprise architectures? ..." + ) + ); + } +} \ No newline at end of file diff --git a/Samples/Cleipnir.Flows.Sample.Presentation/C_NewsletterSender/Distributed/NewsletterChildFlow.cs b/Samples/Cleipnir.Flows.Sample.Presentation/C_NewsletterSender/Distributed/NewsletterChildFlow.cs new file mode 100644 index 0000000..301a7e8 --- /dev/null +++ b/Samples/Cleipnir.Flows.Sample.Presentation/C_NewsletterSender/Distributed/NewsletterChildFlow.cs @@ -0,0 +1,40 @@ +using Cleipnir.ResilientFunctions.Domain; +using MailKit.Net.Smtp; +using MimeKit; +using MimeKit.Text; + +namespace Cleipnir.Flows.Sample.Presentation.C_NewsletterSender.Distributed; + +public class NewsletterChildFlow(NewsletterParentFlows parentFlows, int child) : Flow +{ + public override async Task Run(NewsletterChildWork work) + { + Console.WriteLine($"Starting child: {child}"); + + var (recipients, subject, content) = work.MailAndRecipients; + using var client = new SmtpClient(); + await client.ConnectAsync("mail.smtpbucket.com", 8025); + + for (var index = 0; index < recipients.Count; index++) + { + var recipient = recipients[index]; + var message = new MimeMessage(); + message.To.Add(new MailboxAddress(recipient.Name, recipient.Address)); + message.From.Add(new MailboxAddress("Cleipnir.NET", "newsletter@cleipnir.net")); + + message.Subject = subject; + message.Body = new TextPart(TextFormat.Html) { Text = content }; + await client.SendAsync(message); + } + + await parentFlows.SendMessage( + work.Parent.Instance, + new NewsletterParentFlow.EmailsSent(work.Child), + idempotencyKey: work.Child.ToString() + ); + + Console.WriteLine($"Finishing child: {child}"); + } +} + +public record NewsletterChildWork(int Child, MailAndRecipients MailAndRecipients, FlowId Parent); \ No newline at end of file diff --git a/Samples/Cleipnir.Flows.Sample.Presentation/C_NewsletterSender/Distributed/NewsletterParentFlow.cs b/Samples/Cleipnir.Flows.Sample.Presentation/C_NewsletterSender/Distributed/NewsletterParentFlow.cs new file mode 100644 index 0000000..d9166eb --- /dev/null +++ b/Samples/Cleipnir.Flows.Sample.Presentation/C_NewsletterSender/Distributed/NewsletterParentFlow.cs @@ -0,0 +1,37 @@ +using Cleipnir.ResilientFunctions.Domain; +using Cleipnir.ResilientFunctions.Helpers; +using Cleipnir.ResilientFunctions.Reactive.Extensions; + +namespace Cleipnir.Flows.Sample.Presentation.C_NewsletterSender.Distributed; + +public class NewsletterParentFlow(NewsletterChildFlows childFlows) : Flow +{ + public override async Task Run(MailAndRecipients param) + { + Console.WriteLine("Started NewsletterParentFlow"); + + var (recipients, subject, content) = param; + + var bulkWork = recipients + .Split(3) + .Select(emails => new MailAndRecipients(emails, subject, content)) + .Select((mailAndRecipients, child) => new NewsletterChildWork(child, mailAndRecipients, Workflow.FlowId)) + .Select(work => + new BulkWork( + Instance: $"{Workflow.FlowId.Instance}_Child{work.Child}", + work + ) + ); + + await childFlows.BulkSchedule(bulkWork); + + await Messages + .OfType() + .Take(3) + .Completion(maxWait: TimeSpan.FromMinutes(30)); + + Console.WriteLine("Finished NewsletterParentFlow"); + } + + public record EmailsSent(int Child); +} \ No newline at end of file diff --git a/Samples/Cleipnir.Flows.Sample.Presentation/C_NewsletterSender/Example.cs b/Samples/Cleipnir.Flows.Sample.Presentation/C_NewsletterSender/Example.cs index 4db3316..fc4ec10 100644 --- a/Samples/Cleipnir.Flows.Sample.Presentation/C_NewsletterSender/Example.cs +++ b/Samples/Cleipnir.Flows.Sample.Presentation/C_NewsletterSender/Example.cs @@ -1,4 +1,5 @@ -using Cleipnir.ResilientFunctions.PostgreSQL; +using Cleipnir.Flows.AspNet; +using Cleipnir.ResilientFunctions.PostgreSQL; namespace Cleipnir.Flows.Sample.Presentation.C_NewsletterSender; @@ -7,28 +8,32 @@ public static class Example public static async Task Perform() { var connStr = "Server=localhost;Database=flows;User Id=postgres;Password=Pa55word!; Include Error Detail=true;"; - var flowStore = new PostgreSqlFunctionStore(connStr); - await flowStore.Initialize(); - await flowStore.TruncateTables(); + await DatabaseHelper.CreateDatabaseIfNotExists(connStr); + var store = new PostgreSqlFunctionStore(connStr); + await store.Initialize(); + await store.TruncateTables(); var serviceCollection = new ServiceCollection(); serviceCollection.AddTransient(); - var flowsContainer = new FlowsContainer( - flowStore, - serviceCollection.BuildServiceProvider(), - new Options(unhandledExceptionHandler: Console.WriteLine) + + serviceCollection.AddFlows( + c => c + .UseStore(store) + .WithOptions(new Options(unhandledExceptionHandler: Console.WriteLine)) + .RegisterFlowsAutomatically() ); + + var sp = serviceCollection.BuildServiceProvider(); + var flows = sp.GetRequiredService(); - var flows = new NewsletterFlows(flowsContainer); await flows.Run( - "2023-10", + instanceId: "2023-10", new MailAndRecipients( - new List - { + [ new("Peter Hansen", "peter@gmail.com"), new("Ulla Hansen", "ulla@gmail.com"), new("Heino Hansen", "heino@gmail.com") - }, + ], Subject: "To message queue or not?", Content: "Message Queues are omnipresent but do we really need them in our enterprise architectures? ..." ) diff --git a/Samples/Cleipnir.Flows.Sample.Presentation/C_NewsletterSender/NewsletterFlow.cs b/Samples/Cleipnir.Flows.Sample.Presentation/C_NewsletterSender/NewsletterFlow.cs index 6c03d2d..6384fc9 100644 --- a/Samples/Cleipnir.Flows.Sample.Presentation/C_NewsletterSender/NewsletterFlow.cs +++ b/Samples/Cleipnir.Flows.Sample.Presentation/C_NewsletterSender/NewsletterFlow.cs @@ -34,7 +34,7 @@ public override async Task Run(MailAndRecipients mailAndRecipients) public record EmailAddress(string Name, string Address); public record MailAndRecipients( - List Recipients, + IReadOnlyList Recipients, string Subject, string Content ); \ No newline at end of file diff --git a/Samples/Cleipnir.Flows.Sample.Presentation/Program.cs b/Samples/Cleipnir.Flows.Sample.Presentation/Program.cs index e7093cf..eb0a14a 100644 --- a/Samples/Cleipnir.Flows.Sample.Presentation/Program.cs +++ b/Samples/Cleipnir.Flows.Sample.Presentation/Program.cs @@ -11,7 +11,7 @@ private static async Task Main(string[] args) .Enrich.FromLogContext() .CreateLogger(); - await Task.CompletedTask; + await C_NewsletterSender.Distributed.Example.Perform(); Console.ReadLine(); }