diff --git a/docs/guide/messaging/transports/mqtt.md b/docs/guide/messaging/transports/mqtt.md
index 5a011506a..9f8608ff6 100644
--- a/docs/guide/messaging/transports/mqtt.md
+++ b/docs/guide/messaging/transports/mqtt.md
@@ -357,6 +357,33 @@ public static ClearMqttTopic Handle(TriggerZero message)
snippet source | anchor
+## Authentication via OAuth2
+
+Wolverine supports MQTT v5 OAuth2/JWT authentication by supplying a token callback and refresh interval when you configure
+the transport. The callback returns raw token bytes (use UTF-8 encoding if your token is a string). When configured,
+Wolverine sets the MQTT authentication method to `OAUTH2-JWT`, sends the initial token with the connect packet, and
+re-authenticates on the configured refresh period while the client is connected.
+
+::: info
+You don't need to configure `AuthenticationMethod` and `AuthenticationData` by yourself. These are overriden when the `MqttJwtAuthenticationOptions` parameter is set.
+:::
+
+
+
+Minimal configuration example:
+```cs
+var builder = Host.CreateApplicationBuilder();
+
+builder.UseWolverine(opts =>
+{
+ opts.UseMqtt(
+ mqtt => mqtt.WithClientOptions(client => client.WithTcpServer("broker")),
+ new MqttJwtAuthenticationOptions(
+ async () => Encoding.UTF8.GetBytes(await GetJwtTokenAsync()),
+ 30.Minutes()));
+});
+```
+
## Interoperability
::: tip
diff --git a/src/Transports/MQTT/Wolverine.MQTT/Internals/MqttEnvelopeMapper.cs b/src/Transports/MQTT/Wolverine.MQTT/Internals/MqttEnvelopeMapper.cs
index ef458988a..42af23bfb 100644
--- a/src/Transports/MQTT/Wolverine.MQTT/Internals/MqttEnvelopeMapper.cs
+++ b/src/Transports/MQTT/Wolverine.MQTT/Internals/MqttEnvelopeMapper.cs
@@ -107,9 +107,12 @@ public void MapIncomingToEnvelope(Envelope envelope, MqttApplicationMessage inco
envelope.MessageType = _topic.MessageTypeName;
envelope.TopicName = incoming.Topic;
- foreach (var property in incoming.UserProperties)
+ if (incoming.UserProperties?.Count > 0)
{
- EnvelopeSerializer.ReadDataElement(envelope, property.Name, property.Value);
+ foreach (var property in incoming.UserProperties)
+ {
+ EnvelopeSerializer.ReadDataElement(envelope, property.Name, property.Value);
+ }
}
}
}
\ No newline at end of file
diff --git a/src/Transports/MQTT/Wolverine.MQTT/Internals/MqttTransport.cs b/src/Transports/MQTT/Wolverine.MQTT/Internals/MqttTransport.cs
index 2dd8b6187..d9bd84459 100644
--- a/src/Transports/MQTT/Wolverine.MQTT/Internals/MqttTransport.cs
+++ b/src/Transports/MQTT/Wolverine.MQTT/Internals/MqttTransport.cs
@@ -9,6 +9,7 @@
using Wolverine.Runtime;
using Wolverine.Runtime.Routing;
using Wolverine.Transports;
+using Timer = System.Timers.Timer;
namespace Wolverine.MQTT.Internals;
@@ -19,6 +20,7 @@ public class MqttTransport : TransportBase, IAsyncDisposable
private ImHashMap _topicListeners = ImHashMap.Empty;
private bool _subscribed;
private ILogger _logger;
+ private Timer? _jwtTokenRefreshTimer;
public static string TopicForUri(Uri uri)
{
@@ -63,9 +65,15 @@ public override async ValueTask InitializeAsync(IWolverineRuntime runtime)
Client = mqttFactory.CreateManagedMqttClient(logger);
Options.ClientOptions.ProtocolVersion = MqttProtocolVersion.V500;
+ if (JwtAuthenticationOptions is not null)
+ {
+ Options.ClientOptions.AuthenticationMethod = "OAUTH2-JWT";
+ Options.ClientOptions.AuthenticationData = await JwtAuthenticationOptions.GetTokenCallBack();
+ }
+ Client.ConnectedAsync += onClientConnected;
+ Client.DisconnectedAsync += onClientDisconnected;
await Client.StartAsync(Options);
-
foreach (var endpoint in Topics)
{
endpoint.Compile(runtime);
@@ -93,6 +101,44 @@ private Task receiveAsync(MqttApplicationMessageReceivedEventArgs arg)
return Task.CompletedTask;
}
}
+
+ private Task onClientConnected(MqttClientConnectedEventArgs arg)
+ {
+ if (arg.ConnectResult.ResultCode != MqttClientConnectResultCode.Success)
+ {
+ return Task.CompletedTask;
+ }
+
+ if (JwtAuthenticationOptions == null)
+ {
+ return Task.CompletedTask;
+ }
+
+ _jwtTokenRefreshTimer = new Timer(JwtAuthenticationOptions.RefreshPeriod);
+ _jwtTokenRefreshTimer.Elapsed += async (sender, args) => await RefreshToken(sender, args);
+ _jwtTokenRefreshTimer.Start();
+ return Task.CompletedTask;
+
+ async Task RefreshToken(object? sender, System.Timers.ElapsedEventArgs e)
+ {
+ if (Client.IsConnected)
+ {
+ await Client.InternalClient.SendExtendedAuthenticationExchangeDataAsync(
+ new MqttExtendedAuthenticationExchangeData()
+ {
+ AuthenticationData = await JwtAuthenticationOptions!.GetTokenCallBack(),
+ ReasonCode = MQTTnet.Protocol.MqttAuthenticateReasonCode.ReAuthenticate
+ });
+ }
+ }
+ }
+
+ private Task onClientDisconnected(MqttClientDisconnectedEventArgs arg)
+ {
+ _jwtTokenRefreshTimer?.Stop();
+ _jwtTokenRefreshTimer?.Dispose();
+ return Task.CompletedTask;
+ }
internal bool tryFindListener(string topicName, out MqttListener listener)
{
@@ -111,6 +157,7 @@ internal bool tryFindListener(string topicName, out MqttListener listener)
}
internal IManagedMqttClient Client { get; private set; }
+ internal MqttJwtAuthenticationOptions? JwtAuthenticationOptions { get; set; }
public ManagedMqttClientOptions Options { get; set; } = new ManagedMqttClientOptions
{ ClientOptions = new MqttClientOptions() };
@@ -127,6 +174,7 @@ public async ValueTask DisposeAsync()
// ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
if (Client is not null)
await Client.StopAsync();
+ _jwtTokenRefreshTimer?.Dispose();
}
catch (ObjectDisposedException)
{
diff --git a/src/Transports/MQTT/Wolverine.MQTT/MqttJwtAuthenticationOptions.cs b/src/Transports/MQTT/Wolverine.MQTT/MqttJwtAuthenticationOptions.cs
new file mode 100644
index 000000000..7ef5b9ed8
--- /dev/null
+++ b/src/Transports/MQTT/Wolverine.MQTT/MqttJwtAuthenticationOptions.cs
@@ -0,0 +1,3 @@
+namespace Wolverine.MQTT;
+
+public record MqttJwtAuthenticationOptions(Func> GetTokenCallBack, TimeSpan RefreshPeriod);
\ No newline at end of file
diff --git a/src/Transports/MQTT/Wolverine.MQTT/MqttTransportExtensions.cs b/src/Transports/MQTT/Wolverine.MQTT/MqttTransportExtensions.cs
index f7c6e98de..ee7d9dd60 100644
--- a/src/Transports/MQTT/Wolverine.MQTT/MqttTransportExtensions.cs
+++ b/src/Transports/MQTT/Wolverine.MQTT/MqttTransportExtensions.cs
@@ -26,15 +26,19 @@ internal static MqttTransport MqttTransport(this WolverineOptions endpoints)
///
///
///
+ /// Sets AuthenticationMethod to OAUTH-JWT and uses the callback to fetch a token.
+ /// When the configured period elapses, a new token is fetched and a ExtendedAuthenticationExchangeData with ReasonCode ReAuth is sent with the new token.
///
public static MqttTransportExpression UseMqtt(this WolverineOptions options,
- Action configure)
+ Action configure,
+ MqttJwtAuthenticationOptions? jwtAuthenticationOptions = null)
{
var transport = options.MqttTransport();
var builder = new ManagedMqttClientOptionsBuilder();
configure(builder);
transport.Options = builder.Build();
+ transport.JwtAuthenticationOptions = jwtAuthenticationOptions;
return new MqttTransportExpression(transport, options);
}
@@ -44,12 +48,16 @@ public static MqttTransportExpression UseMqtt(this WolverineOptions options,
///
///
///
+ /// Sets AuthenticationMethod to OAUTH-JWT and uses the callback to fetch a token.
+ /// When the configured period elapses, a new token is fetched and a ExtendedAuthenticationExchangeData with ReasonCode ReAuth is sent with the new token.
///
- public static MqttTransportExpression UseMqtt(this WolverineOptions options, ManagedMqttClientOptions mqttOptions)
+ public static MqttTransportExpression UseMqtt(this WolverineOptions options, ManagedMqttClientOptions mqttOptions,
+ MqttJwtAuthenticationOptions? jwtAuthenticationOptions = null)
{
var transport = options.MqttTransport();
transport.Options = mqttOptions;
+ transport.JwtAuthenticationOptions = jwtAuthenticationOptions;
return new MqttTransportExpression(transport, options);
}
@@ -65,10 +73,7 @@ public static MqttTransportExpression UseMqttWithLocalBroker(this WolverineOptio
{
return options.UseMqtt(builder =>
{
- builder.WithClientOptions(opts =>
- {
- opts.WithTcpServer("127.0.0.1", port);
- });
+ builder.WithClientOptions(opts => { opts.WithTcpServer("127.0.0.1", port); });
});
}
@@ -138,7 +143,8 @@ public static MqttSubscriberConfiguration ToMqttTopics(this IPublishToExpression
///
///
///
- public static MqttSubscriberConfiguration PublishMessagesToMqttTopic(this WolverineOptions options, Func topicSource)
+ public static MqttSubscriberConfiguration PublishMessagesToMqttTopic(this WolverineOptions options,
+ Func topicSource)
{
var transports = options.Transports;
var transport = transports.GetOrCreate();