Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions docs/guide/messaging/transports/mqtt.md
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,33 @@ public static ClearMqttTopic Handle(TriggerZero message)
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/MQTT/Wolverine.MQTT.Tests/ack_smoke_tests.cs#L84-L98' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_ack_mqtt_topic' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Authentication via OAuth2

Wolverine supports MQTT v5 OAuth2/JWT authentication by supplying a token callback and refresh interval when you configure
the transport. The callback returns raw token bytes (use UTF-8 encoding if your token is a string). When configured,
Wolverine sets the MQTT authentication method to `OAUTH2-JWT`, sends the initial token with the connect packet, and
re-authenticates on the configured refresh period while the client is connected.

::: info
You don't need to configure `AuthenticationMethod` and `AuthenticationData` by yourself. These are overriden when the `MqttJwtAuthenticationOptions` parameter is set.
:::

<!-- snippet: sample_mqtt_with_oauth-->
<a id='snippet-sample_mqtt_with_oauth'></a>
Minimal configuration example:
```cs
var builder = Host.CreateApplicationBuilder();

builder.UseWolverine(opts =>
{
opts.UseMqtt(
mqtt => mqtt.WithClientOptions(client => client.WithTcpServer("broker")),
new MqttJwtAuthenticationOptions(
async () => Encoding.UTF8.GetBytes(await GetJwtTokenAsync()),
30.Minutes()));
});
```

## Interoperability

::: tip
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,12 @@ public void MapIncomingToEnvelope(Envelope envelope, MqttApplicationMessage inco
envelope.MessageType = _topic.MessageTypeName;
envelope.TopicName = incoming.Topic;

foreach (var property in incoming.UserProperties)
if (incoming.UserProperties?.Count > 0)
{
EnvelopeSerializer.ReadDataElement(envelope, property.Name, property.Value);
foreach (var property in incoming.UserProperties)
{
EnvelopeSerializer.ReadDataElement(envelope, property.Name, property.Value);
}
}
}
}
50 changes: 49 additions & 1 deletion src/Transports/MQTT/Wolverine.MQTT/Internals/MqttTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using Wolverine.Runtime;
using Wolverine.Runtime.Routing;
using Wolverine.Transports;
using Timer = System.Timers.Timer;

namespace Wolverine.MQTT.Internals;

Expand All @@ -19,6 +20,7 @@ public class MqttTransport : TransportBase<MqttTopic>, IAsyncDisposable
private ImHashMap<string, MqttListener> _topicListeners = ImHashMap<string, MqttListener>.Empty;
private bool _subscribed;
private ILogger<MqttTransport> _logger;
private Timer? _jwtTokenRefreshTimer;

public static string TopicForUri(Uri uri)
{
Expand Down Expand Up @@ -63,9 +65,15 @@ public override async ValueTask InitializeAsync(IWolverineRuntime runtime)
Client = mqttFactory.CreateManagedMqttClient(logger);

Options.ClientOptions.ProtocolVersion = MqttProtocolVersion.V500;
if (JwtAuthenticationOptions is not null)
{
Options.ClientOptions.AuthenticationMethod = "OAUTH2-JWT";
Options.ClientOptions.AuthenticationData = await JwtAuthenticationOptions.GetTokenCallBack();
}

Client.ConnectedAsync += onClientConnected;
Client.DisconnectedAsync += onClientDisconnected;
await Client.StartAsync(Options);

foreach (var endpoint in Topics)
{
endpoint.Compile(runtime);
Expand Down Expand Up @@ -93,6 +101,44 @@ private Task receiveAsync(MqttApplicationMessageReceivedEventArgs arg)
return Task.CompletedTask;
}
}

private Task onClientConnected(MqttClientConnectedEventArgs arg)
{
if (arg.ConnectResult.ResultCode != MqttClientConnectResultCode.Success)
{
return Task.CompletedTask;
}

if (JwtAuthenticationOptions == null)
{
return Task.CompletedTask;
}

_jwtTokenRefreshTimer = new Timer(JwtAuthenticationOptions.RefreshPeriod);
_jwtTokenRefreshTimer.Elapsed += async (sender, args) => await RefreshToken(sender, args);
_jwtTokenRefreshTimer.Start();
return Task.CompletedTask;

async Task RefreshToken(object? sender, System.Timers.ElapsedEventArgs e)
{
if (Client.IsConnected)
{
await Client.InternalClient.SendExtendedAuthenticationExchangeDataAsync(
new MqttExtendedAuthenticationExchangeData()
{
AuthenticationData = await JwtAuthenticationOptions!.GetTokenCallBack(),
ReasonCode = MQTTnet.Protocol.MqttAuthenticateReasonCode.ReAuthenticate
});
}
}
}

private Task onClientDisconnected(MqttClientDisconnectedEventArgs arg)
{
_jwtTokenRefreshTimer?.Stop();
_jwtTokenRefreshTimer?.Dispose();
return Task.CompletedTask;
}

internal bool tryFindListener(string topicName, out MqttListener listener)
{
Expand All @@ -111,6 +157,7 @@ internal bool tryFindListener(string topicName, out MqttListener listener)
}

internal IManagedMqttClient Client { get; private set; }
internal MqttJwtAuthenticationOptions? JwtAuthenticationOptions { get; set; }

public ManagedMqttClientOptions Options { get; set; } = new ManagedMqttClientOptions
{ ClientOptions = new MqttClientOptions() };
Expand All @@ -127,6 +174,7 @@ public async ValueTask DisposeAsync()
// ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
if (Client is not null)
await Client.StopAsync();
_jwtTokenRefreshTimer?.Dispose();
}
catch (ObjectDisposedException)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
namespace Wolverine.MQTT;

public record MqttJwtAuthenticationOptions(Func<Task<byte[]>> GetTokenCallBack, TimeSpan RefreshPeriod);
20 changes: 13 additions & 7 deletions src/Transports/MQTT/Wolverine.MQTT/MqttTransportExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,19 @@ internal static MqttTransport MqttTransport(this WolverineOptions endpoints)
/// </summary>
/// <param name="options"></param>
/// <param name="configure"></param>
/// <param name="jwtAuthenticationOptions">Sets AuthenticationMethod to OAUTH-JWT and uses the callback to fetch a token.
/// When the configured period elapses, a new token is fetched and a ExtendedAuthenticationExchangeData with ReasonCode ReAuth is sent with the new token.
/// <returns></returns>
public static MqttTransportExpression UseMqtt(this WolverineOptions options,
Action<ManagedMqttClientOptionsBuilder> configure)
Action<ManagedMqttClientOptionsBuilder> configure,
MqttJwtAuthenticationOptions? jwtAuthenticationOptions = null)
{
var transport = options.MqttTransport();
var builder = new ManagedMqttClientOptionsBuilder();
configure(builder);

transport.Options = builder.Build();
transport.JwtAuthenticationOptions = jwtAuthenticationOptions;

return new MqttTransportExpression(transport, options);
}
Expand All @@ -44,12 +48,16 @@ public static MqttTransportExpression UseMqtt(this WolverineOptions options,
/// </summary>
/// <param name="options"></param>
/// <param name="mqttOptions"></param>
/// <param name="jwtAuthenticationOptions">Sets AuthenticationMethod to OAUTH-JWT and uses the callback to fetch a token.
/// When the configured period elapses, a new token is fetched and a ExtendedAuthenticationExchangeData with ReasonCode ReAuth is sent with the new token.
/// <returns></returns>
public static MqttTransportExpression UseMqtt(this WolverineOptions options, ManagedMqttClientOptions mqttOptions)
public static MqttTransportExpression UseMqtt(this WolverineOptions options, ManagedMqttClientOptions mqttOptions,
MqttJwtAuthenticationOptions? jwtAuthenticationOptions = null)
{
var transport = options.MqttTransport();

transport.Options = mqttOptions;
transport.JwtAuthenticationOptions = jwtAuthenticationOptions;

return new MqttTransportExpression(transport, options);
}
Expand All @@ -65,10 +73,7 @@ public static MqttTransportExpression UseMqttWithLocalBroker(this WolverineOptio
{
return options.UseMqtt(builder =>
{
builder.WithClientOptions(opts =>
{
opts.WithTcpServer("127.0.0.1", port);
});
builder.WithClientOptions(opts => { opts.WithTcpServer("127.0.0.1", port); });
});
}

Expand Down Expand Up @@ -138,7 +143,8 @@ public static MqttSubscriberConfiguration ToMqttTopics(this IPublishToExpression
/// <param name="topicSource"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public static MqttSubscriberConfiguration PublishMessagesToMqttTopic<T>(this WolverineOptions options, Func<T, string> topicSource)
public static MqttSubscriberConfiguration PublishMessagesToMqttTopic<T>(this WolverineOptions options,
Func<T, string> topicSource)
{
var transports = options.Transports;
var transport = transports.GetOrCreate<MqttTransport>();
Expand Down
Loading