Skip to content

Commit

Permalink
Implemented distributed newsletter flow sample
Browse files Browse the repository at this point in the history
  • Loading branch information
stidsborg committed Oct 5, 2024
1 parent a19b96c commit fcfbf0b
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
using Cleipnir.Flows.AspNet;
using Cleipnir.ResilientFunctions.PostgreSQL;

namespace Cleipnir.Flows.Sample.Presentation.C_NewsletterSender.Distributed;

public static class Example
{
public static async Task Perform()
{
var connStr = "Server=localhost;Database=flows;User Id=postgres;Password=Pa55word!; Include Error Detail=true;";
await DatabaseHelper.CreateDatabaseIfNotExists(connStr);
var store = new PostgreSqlFunctionStore(connStr);
await store.Initialize();
await store.TruncateTables();

var replicas = Enumerable
.Range(0, 3)
.Select(child =>
{
var serviceCollection = new ServiceCollection();
serviceCollection.AddFlows(
c => c
.UseStore(store)
.WithOptions(new Options(unhandledExceptionHandler: Console.WriteLine))
.RegisterFlow<NewsletterChildFlow, NewsletterChildFlows>(
flowsFactory: sp =>
new NewsletterChildFlows(
sp.GetRequiredService<FlowsContainer>(),
options: new Options(maxParallelRetryInvocations: 1)
),
flowFactory: sp => new NewsletterChildFlow(sp.GetRequiredService<NewsletterParentFlows>(), child)
)
.RegisterFlowsAutomatically()
);

var sp = serviceCollection.BuildServiceProvider();
var __ = sp.GetRequiredService<NewsletterChildFlows>();

return sp.GetRequiredService<NewsletterParentFlows>();
})
.ToList();

await replicas[0].Run(
instanceId: "2023-10",
new MailAndRecipients(
[
new("Peter Hansen", "[email protected]"),
new("Ulla Hansen", "[email protected]"),
new("Heino Hansen", "[email protected]")
],
Subject: "To message queue or not?",
Content: "Message Queues are omnipresent but do we really need them in our enterprise architectures? ..."
)
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
using Cleipnir.ResilientFunctions.Domain;
using MailKit.Net.Smtp;
using MimeKit;
using MimeKit.Text;

namespace Cleipnir.Flows.Sample.Presentation.C_NewsletterSender.Distributed;

public class NewsletterChildFlow(NewsletterParentFlows parentFlows, int child) : Flow<NewsletterChildWork>
{
public override async Task Run(NewsletterChildWork work)
{
Console.WriteLine($"Starting child: {child}");

var (recipients, subject, content) = work.MailAndRecipients;
using var client = new SmtpClient();
await client.ConnectAsync("mail.smtpbucket.com", 8025);

for (var index = 0; index < recipients.Count; index++)
{
var recipient = recipients[index];
var message = new MimeMessage();
message.To.Add(new MailboxAddress(recipient.Name, recipient.Address));
message.From.Add(new MailboxAddress("Cleipnir.NET", "[email protected]"));

message.Subject = subject;
message.Body = new TextPart(TextFormat.Html) { Text = content };
await client.SendAsync(message);
}

await parentFlows.SendMessage(
work.Parent.Instance,
new NewsletterParentFlow.EmailsSent(work.Child),
idempotencyKey: work.Child.ToString()
);

Console.WriteLine($"Finishing child: {child}");
}
}

public record NewsletterChildWork(int Child, MailAndRecipients MailAndRecipients, FlowId Parent);
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
using Cleipnir.ResilientFunctions.Domain;
using Cleipnir.ResilientFunctions.Helpers;
using Cleipnir.ResilientFunctions.Reactive.Extensions;

namespace Cleipnir.Flows.Sample.Presentation.C_NewsletterSender.Distributed;

public class NewsletterParentFlow(NewsletterChildFlows childFlows) : Flow<MailAndRecipients>
{
public override async Task Run(MailAndRecipients param)
{
Console.WriteLine("Started NewsletterParentFlow");

var (recipients, subject, content) = param;

var bulkWork = recipients
.Split(3)
.Select(emails => new MailAndRecipients(emails, subject, content))
.Select((mailAndRecipients, child) => new NewsletterChildWork(child, mailAndRecipients, Workflow.FlowId))
.Select(work =>
new BulkWork<NewsletterChildWork>(
Instance: $"{Workflow.FlowId.Instance}_Child{work.Child}",
work
)
);

await childFlows.BulkSchedule(bulkWork);

await Messages
.OfType<EmailsSent>()
.Take(3)
.Completion(maxWait: TimeSpan.FromMinutes(30));

Console.WriteLine("Finished NewsletterParentFlow");
}

public record EmailsSent(int Child);
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Cleipnir.ResilientFunctions.PostgreSQL;
using Cleipnir.Flows.AspNet;
using Cleipnir.ResilientFunctions.PostgreSQL;

namespace Cleipnir.Flows.Sample.Presentation.C_NewsletterSender;

Expand All @@ -7,28 +8,32 @@ public static class Example
public static async Task Perform()
{
var connStr = "Server=localhost;Database=flows;User Id=postgres;Password=Pa55word!; Include Error Detail=true;";
var flowStore = new PostgreSqlFunctionStore(connStr);
await flowStore.Initialize();
await flowStore.TruncateTables();
await DatabaseHelper.CreateDatabaseIfNotExists(connStr);
var store = new PostgreSqlFunctionStore(connStr);
await store.Initialize();
await store.TruncateTables();

var serviceCollection = new ServiceCollection();
serviceCollection.AddTransient<NewsletterFlow>();
var flowsContainer = new FlowsContainer(
flowStore,
serviceCollection.BuildServiceProvider(),
new Options(unhandledExceptionHandler: Console.WriteLine)

serviceCollection.AddFlows(
c => c
.UseStore(store)
.WithOptions(new Options(unhandledExceptionHandler: Console.WriteLine))
.RegisterFlowsAutomatically()
);

var sp = serviceCollection.BuildServiceProvider();
var flows = sp.GetRequiredService<NewsletterFlows>();

var flows = new NewsletterFlows(flowsContainer);
await flows.Run(
"2023-10",
instanceId: "2023-10",
new MailAndRecipients(
new List<EmailAddress>
{
[
new("Peter Hansen", "[email protected]"),
new("Ulla Hansen", "[email protected]"),
new("Heino Hansen", "[email protected]")
},
],
Subject: "To message queue or not?",
Content: "Message Queues are omnipresent but do we really need them in our enterprise architectures? ..."
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public override async Task Run(MailAndRecipients mailAndRecipients)

public record EmailAddress(string Name, string Address);
public record MailAndRecipients(
List<EmailAddress> Recipients,
IReadOnlyList<EmailAddress> Recipients,
string Subject,
string Content
);
2 changes: 1 addition & 1 deletion Samples/Cleipnir.Flows.Sample.Presentation/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ private static async Task Main(string[] args)
.Enrich.FromLogContext()
.CreateLogger();

await Task.CompletedTask;
await C_NewsletterSender.Distributed.Example.Perform();

Console.ReadLine();
}
Expand Down

0 comments on commit fcfbf0b

Please sign in to comment.