Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 149 additions & 0 deletions src/Persistence/MySql/MySqlTests/Agents/control_queue_tests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
using System.Diagnostics;
using IntegrationTests;
using JasperFx.Core;
using JasperFx.Resources;
using Microsoft.Extensions.Hosting;
using MySqlConnector;
using Shouldly;
using Wolverine;
using Wolverine.MySql;
using Wolverine.RDBMS;
using Wolverine.Tracking;

namespace MySqlTests.Agents;

[Collection("mysql")]
public class control_queue_tests : MySqlContext, IAsyncLifetime
{
private static IHost _sender;
private static IHost _receiver;
private static Uri _receiverUri;

public async Task InitializeAsync()
{
await dropControlSchema();
}

public async Task DisposeAsync()
{
await _sender.StopAsync();
_sender.Dispose();
await _receiver.StopAsync();
_receiver.Dispose();
}

private static async Task dropControlSchema()
{
await using var conn = new MySqlConnection(Servers.MySqlConnectionString);
await conn.OpenAsync();
await using var cmd = conn.CreateCommand();
cmd.CommandText = "DROP DATABASE IF EXISTS `mysqlcontrol`";
await cmd.ExecuteNonQueryAsync();
await conn.CloseAsync();

_sender = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.PersistMessagesWithMySql(Servers.MySqlConnectionString, "mysqlcontrol");
opts.ServiceName = "Sender";
opts.Services.AddResourceSetupOnStartup();
}).StartAsync();

_receiver = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.PersistMessagesWithMySql(Servers.MySqlConnectionString, "mysqlcontrol");
opts.ServiceName = "Receiver";
opts.Services.AddResourceSetupOnStartup();
}).StartAsync();

var nodeId = _receiver.GetRuntime().Options.UniqueNodeId;
_receiverUri = new Uri($"dbcontrol://{nodeId}");
}

[Fact]
public async Task control_queue_table_should_exist()
{
await using var conn = new MySqlConnection(Servers.MySqlConnectionString);
await conn.OpenAsync();

await using var cmd = conn.CreateCommand();
cmd.CommandText = @"
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'mysqlcontrol'
AND table_name LIKE 'wolverine%'";

var tables = new List<string>();
await using var reader = await cmd.ExecuteReaderAsync();
while (await reader.ReadAsync())
{
tables.Add(reader.GetString(0));
}
await conn.CloseAsync();

tables.ShouldContain(DatabaseConstants.ControlQueueTableName);
}

[Fact]
public async Task send_message_from_one_to_another()
{
var tracked = await _sender.TrackActivity()
.AlsoTrack(_receiver)
.Timeout(10.Seconds())
.ExecuteAndWaitAsync(m => m.EndpointFor(_receiverUri).SendAsync(new MySqlCommand(10)));

tracked.Sent.RecordsInOrder().Single(x => x.Envelope.Message?.GetType() == typeof(MySqlCommand)).ServiceName
.ShouldBe("Sender");
tracked.Received.RecordsInOrder().Single(x => x.Envelope.Message?.GetType() == typeof(MySqlCommand))
.ServiceName
.ShouldBe("Receiver");
}

[Fact]
public async Task request_reply_message_from_one_to_another()
{
var (tracked, result) = await _sender.TrackActivity()
.AlsoTrack(_receiver)
.Timeout(120.Seconds())
.InvokeAndWaitAsync<MySqlResult>(new MySqlQuery(13), _receiverUri);

result.Number.ShouldBe(13);


tracked.Sent.RecordsInOrder().Single(x => x.Envelope.Message.GetType() == typeof(MySqlQuery)).ServiceName
.ShouldBe("Sender");
tracked.Received.RecordsInOrder().Single(x => x.Envelope.Message.GetType() == typeof(MySqlQuery)).ServiceName
.ShouldBe("Receiver");

tracked.Sent.RecordsInOrder().Single(x => x.Envelope.Message.GetType() == typeof(MySqlResult)).ServiceName
.ShouldBe("Receiver");
tracked.Received.RecordsInOrder().Single(x => x.Envelope.Message.GetType() == typeof(MySqlResult))
.ServiceName
.ShouldBe("Sender");
}
}

public record MySqlQuery(int Number);

public record MySqlResult(int Number);

public record MySqlCommand(int Number);

public static class MySqlQueryMessageHandler
{
public static MySqlResult Handle(MySqlQuery query)
{
return new MySqlResult(query.Number);
}

public static void Handle(MySqlCommand command)
{
Debug.WriteLine($"Got command {command.Number}");
}

public static void Handle(MySqlResult result)
{
Debug.WriteLine($"Got result {result.Number}");
}
}
30 changes: 30 additions & 0 deletions src/Persistence/MySql/MySqlTests/Agents/leader_election.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using IntegrationTests;
using MySqlConnector;
using Wolverine;
using Wolverine.ComplianceTests;
using Wolverine.MySql;
using Xunit.Abstractions;

namespace MySqlTests.Agents;

public class leader_election : LeadershipElectionCompliance
{
public leader_election(ITestOutputHelper output) : base(output)
{
}

protected override void configureNode(WolverineOptions opts)
{
opts.PersistMessagesWithMySql(Servers.MySqlConnectionString, "registry");
}

protected override async Task beforeBuildingHost()
{
await using var conn = new MySqlConnection(Servers.MySqlConnectionString);
await conn.OpenAsync();
await using var cmd = conn.CreateCommand();
cmd.CommandText = "DROP DATABASE IF EXISTS `registry`";
await cmd.ExecuteNonQueryAsync();
await conn.CloseAsync();
}
}
40 changes: 40 additions & 0 deletions src/Persistence/MySql/MySqlTests/Agents/node_persistence.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
using IntegrationTests;
using Microsoft.Extensions.Logging.Abstractions;
using MySqlConnector;
using Wolverine;
using Wolverine.ComplianceTests;
using Wolverine.MySql;
using Wolverine.Persistence.Durability;
using Wolverine.RDBMS;
using Wolverine.RDBMS.Sagas;

namespace MySqlTests.Agents;

[Collection("mysql")]
public class node_persistence : NodePersistenceCompliance
{
protected override async Task<IMessageStore> buildCleanMessageStore()
{
await using var conn = new MySqlConnection(Servers.MySqlConnectionString);
await conn.OpenAsync();
await using var cmd = conn.CreateCommand();
cmd.CommandText = "DROP DATABASE IF EXISTS `nodes`";
await cmd.ExecuteNonQueryAsync();
await conn.CloseAsync();

var dataSource = MySqlDataSourceFactory.Create(Servers.MySqlConnectionString);
var settings = new DatabaseSettings
{
ConnectionString = Servers.MySqlConnectionString,
SchemaName = "nodes",
Role = MessageStoreRole.Main
};

var database = new MySqlMessageStore(settings, new DurabilitySettings(), dataSource,
NullLogger<MySqlMessageStore>.Instance, Array.Empty<SagaTableDefinition>());

await database.Admin.MigrateAsync();

return database;
}
}
Loading