Skip to content

Commit

Permalink
Remove EasyNetQ dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
fredimachado committed May 19, 2024
1 parent cb593f2 commit 84b4205
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 37 deletions.
1 change: 0 additions & 1 deletion Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
<PackageVersion Include="Ardalis.GuardClauses" Version="4.5.0" />
<PackageVersion Include="coverlet.collector" Version="6.0.2" />
<PackageVersion Include="coverlet.msbuild" Version="6.0.2" />
<PackageVersion Include="EasyNetQ" Version="7.5.5" />
<PackageVersion Include="EventStore.Client.Grpc.Streams" Version="21.2.0" />
<PackageVersion Include="FakeItEasy" Version="8.2.0" />
<PackageVersion Include="FluentAssertions" Version="6.12.0" />
Expand Down
3 changes: 2 additions & 1 deletion src/Aspire/NCafe.ServiceDefaults/Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ public static IHostApplicationBuilder ConfigureOpenTelemetry(this IHostApplicati
tracing.AddAspNetCoreInstrumentation()
// Uncomment the following line to enable gRPC instrumentation (requires the OpenTelemetry.Instrumentation.GrpcNetClient package)
//.AddGrpcClientInstrumentation()
.AddHttpClientInstrumentation();
.AddHttpClientInstrumentation()
.AddSource("NCafe.MessageBus.Consumer");
});

builder.AddOpenTelemetryExporters();
Expand Down
84 changes: 60 additions & 24 deletions src/Barista/NCafe.Barista.Api/MessageBus/OrdersConsumerService.cs
Original file line number Diff line number Diff line change
@@ -1,56 +1,92 @@
using EasyNetQ;
using Microsoft.AspNetCore.SignalR;
using Microsoft.AspNetCore.SignalR;
using NCafe.Barista.Api.Hubs;
using NCafe.Barista.Domain.Commands;
using NCafe.Core.Commands;
using NCafe.Core.MessageBus.Events;
using NCafe.Infrastructure.MessageBus;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Diagnostics;
using System.Text.Json;
using System.Threading.Channels;

namespace NCafe.Barista.Api.MessageBus;

// TODO: Create abstraction to remove RabbitMq dependency
public class OrdersConsumerService(
IConnection connection,
ICommandDispatcher commandDispatcher,
IConfiguration configuration,
IHubContext<OrderHub> hubContext,
ILogger<OrdersConsumerService> logger) : IHostedService
{
private const string Queue = "barista_queue";
private const string Topic = "orders";
private const string Queue = "orders_queue";

private readonly IBus _bus = RabbitHutch.CreateBus(configuration.GetConnectionString("RabbitMq"));
private readonly IConnection _connection = connection;
private readonly ICommandDispatcher _commandDispatcher = commandDispatcher;
private readonly IHubContext<OrderHub> _hubContext = hubContext;
private readonly ILogger _logger = logger;

public async Task StartAsync(CancellationToken cancellationToken)
private static readonly ActivitySource ConsumerActivitySource = new("NCafe.MessageBus.Consumer");

private IModel _channel;

public Task StartAsync(CancellationToken cancellationToken)
{
var subscriptionId = Guid.NewGuid().ToString();
await _bus.PubSub.SubscribeAsync<OrderPlaced>(
subscriptionId,
MessageReceived,
config =>
{
config.WithDurable(true)
.WithQueueName(Queue)
.WithTopic(Topic);
},
cancellationToken);
_logger.LogInformation("Subscribing to {Queue}.", Queue);

_channel = _connection.CreateModel();

_channel.QueueDeclare(queue: Queue,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);

var consumer = new EventingBasicConsumer(_channel);
consumer.Received += MessageReceived;
_channel.BasicConsume(queue: Queue,
autoAck: false,
consumer: consumer);

return Task.CompletedTask;
}

private async Task MessageReceived(OrderPlaced orderPlaced, CancellationToken cancellationToken)
private async void MessageReceived(object sender, BasicDeliverEventArgs e)
{
_logger.LogInformation("Received OrderPlaced Event: {@OrderPlaced}", orderPlaced);
// Start an activity with a name following the semantic convention of the OpenTelemetry messaging specification.
// https://github.com/open-telemetry/semantic-conventions/blob/main/docs/messaging/messaging-spans.md#span-name
using var activity = ConsumerActivitySource.StartActivity($"{e.RoutingKey} receive", ActivityKind.Consumer);

await _commandDispatcher.DispatchAsync(new PlaceOrder(orderPlaced.Id, orderPlaced.ProductId, orderPlaced.Quantity));
var message = JsonSerializer.Deserialize<OrderPlaced>(e.Body.ToArray());

_logger.LogInformation("Received {MessageType} Message: {@Message}", message.GetType().Name, message);

activity?.SetTag("message", message);

// Common tags (RabbitMqHelper)
activity?.SetTag("messaging.system", "rabbitmq");
activity?.SetTag("messaging.destination_kind", "queue");
activity?.SetTag("messaging.destination", string.Empty);
activity?.SetTag("messaging.rabbitmq.routing_key", Queue);

// Dispatch domain command
await _commandDispatcher.DispatchAsync(new PlaceOrder(message.Id, message.ProductId, message.Quantity));

// Notify clients
await _hubContext.Clients.All.SendAsync(
"ReceiveOrder",
new Shared.Hubs.Order(orderPlaced.Id, orderPlaced.ProductId, orderPlaced.Quantity),
cancellationToken: cancellationToken);
new Shared.Hubs.Order(message.Id, message.ProductId, message.Quantity));

_channel.BasicAck(e.DeliveryTag, multiple: false);

_logger.LogInformation("Finished processing message.");
}

public Task StopAsync(CancellationToken cancellationToken)
{
_bus?.Dispose();
ConsumerActivitySource?.Dispose();
_channel?.Dispose();
_connection?.Dispose();

return Task.CompletedTask;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public async Task GivenOrderSaved_ShouldPublishToMessageBus()

// Assert
exception.ShouldBeNull();
A.CallTo(() => _publisher.Publish("orders", A<OrderPlaced>.That.Matches(o => o.ProductId == productId && o.Quantity == 1)))
A.CallTo(() => _publisher.Publish("orders_queue", A<OrderPlaced>.That.Matches(o => o.ProductId == productId && o.Quantity == 1)))
.MustHaveHappenedOnceExactly();
}
}
4 changes: 2 additions & 2 deletions src/Cashier/NCafe.Cashier.Domain/Commands/PlaceOrder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ internal sealed class PlaceOrderHandler(
private readonly IReadModelRepository<Product> _productReadRepository = productReadRepository;
private readonly IPublisher _publisher = publisher;

private const string Topic = "orders";
private const string Queue = "orders_queue";

public async Task HandleAsync(PlaceOrder command)
{
Expand All @@ -29,6 +29,6 @@ public async Task HandleAsync(PlaceOrder command)

await _repository.Save(order);

await _publisher.Publish(Topic, new OrderPlaced(order.Id, order.ProductId, order.Quantity));
await _publisher.Publish(Queue, new OrderPlaced(order.Id, order.ProductId, order.Quantity));
}
}
31 changes: 31 additions & 0 deletions src/Common/NCafe.Infrastructure/MessageBus/RabbitMqHelper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
using RabbitMQ.Client;
using System.Diagnostics;

namespace NCafe.Infrastructure.MessageBus;
internal static class RabbitMqHelper
{
public static IModel CreateModelAndDeclareTestQueue(IConnection connection, string queueName)
{
var channel = connection.CreateModel();

channel.QueueDeclare(queue: queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);

return channel;
}

public static void AddMessagingTags(Activity activity, string exchangeName, string queueName)
{
// These tags are added demonstrating the semantic conventions of the OpenTelemetry messaging specification
// See:
// * https://github.com/open-telemetry/semantic-conventions/blob/main/docs/messaging/messaging-spans.md#messaging-attributes
// * https://github.com/open-telemetry/semantic-conventions/blob/main/docs/messaging/rabbitmq.md
activity?.SetTag("messaging.system", "rabbitmq");
activity?.SetTag("messaging.destination_kind", "queue");
activity?.SetTag("messaging.destination", exchangeName);
activity?.SetTag("messaging.rabbitmq.routing_key", queueName);
}
}
30 changes: 23 additions & 7 deletions src/Common/NCafe.Infrastructure/MessageBus/RabbitMqPublisher.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,31 @@
using EasyNetQ;
using Microsoft.Extensions.Configuration;
using NCafe.Core.MessageBus;
using NCafe.Core.MessageBus;
using RabbitMQ.Client;
using System.Diagnostics;
using System.Text.Json;
using Microsoft.Extensions.Logging;

namespace NCafe.Infrastructure.MessageBus;

internal class RabbitMqPublisher(IConfiguration configuration) : IPublisher
internal class RabbitMqPublisher(IConnection connection, ILogger<RabbitMqPublisher> logger) : IPublisher
{
private readonly IBus _bus = RabbitHutch.CreateBus(configuration.GetConnectionString("RabbitMq"));
private readonly IConnection _connection = connection;
private readonly ILogger _logger = logger;

public async Task Publish<T>(string topicName, T message) where T : class, IBusMessage
public Task Publish<T>(string queueName, T message) where T : class, IBusMessage
{
await _bus.PubSub.PublishAsync(message, topicName);
using var channel = RabbitMqHelper.CreateModelAndDeclareTestQueue(_connection, queueName);

RabbitMqHelper.AddMessagingTags(Activity.Current, string.Empty, queueName);

var body = JsonSerializer.SerializeToUtf8Bytes(message);

channel.BasicPublish(exchange: string.Empty,
routingKey: queueName,
basicProperties: null,
body: body);

_logger.LogInformation("Published {MessageType} Message: {@Message}", message.GetType().Name, message);

return Task.CompletedTask;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="EasyNetQ" />
<PackageReference Include="Aspire.RabbitMQ.Client" />
<PackageReference Include="EventStore.Client.Grpc.Streams" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" />
Expand Down

0 comments on commit 84b4205

Please sign in to comment.