-
Notifications
You must be signed in to change notification settings - Fork 492
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[v2 - bug fix] Queue MQTT C2D messages if callback not set #3336
Conversation
@@ -28,11 +28,13 @@ namespace Microsoft.Azure.Devices.Client.Transport | |||
internal sealed class MqttTransportHandler : TransportHandlerBase, IDisposable | |||
{ | |||
private const int ProtocolGatewayPort = 8883; | |||
private const int MessageQueueSize = 10; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Setting the default queue size to 10. The client would always keep track of latest 10 messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspect people are going to want to modify this value - why don't we expose this as a settable property?
Could we just ask users to set the callback before calling open? Or is this to handle a case other than explicit open? |
private async Task HandleReceivedMessageAsync(MqttApplicationMessageReceivedEventArgs receivedEventArgs) | ||
{ | ||
receivedEventArgs.AutoAcknowledge = false; | ||
string topic = receivedEventArgs.ApplicationMessage.Topic; | ||
|
||
if (topic.StartsWith(_deviceBoundMessagesTopic, StringComparison.InvariantCulture)) | ||
{ | ||
await HandleReceivedCloudToDeviceMessageAsync(receivedEventArgs).ConfigureAwait(false); | ||
await HandleReceivedCloudToDeviceMessageAsync(ProcessC2DMessage(receivedEventArgs)).ConfigureAwait(false); | ||
await receivedEventArgs.AcknowledgeAsync(CancellationToken.None).ConfigureAwait(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add some comments here explaining that messages are being added to a queue for later processing, and are being ack'ed immediately
Logging.Error(this, $"Failed to send the acknowledgement for a received cloud to device message {ex}"); ; | ||
{ | ||
Logging.Info(this, $"Queue size of {MessageQueueSize} for C2D messages has been reached, removing oldest queued C2D message. " + | ||
$"To avoid losing further messages, set SetIncomingMessageCallbackAsync() to process the messages."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add this into doc comments. Users don't usually look at logs until something really goes wrong.
I don't think the SDK lets you do anything until the client is opened. |
So we would have this same problem with twin/direct method messages, right? I'd prefer if we allow users to set the callbacks before opening the client, but that may be a more involved fix than we need right now. Because of that, I won't push back if this is the approach we take.
|
This is technically a workaround for service's current behavior. Ideally, a c2d message would have been published with a QoS of 1, and the client would choose to not ack it and service would redeliver them at some frequency. Once the user sets the callback, those messages could then be ack'ed. Alternatively, yeah, we could ask users to set the callback before opening the clients. I think that is a reasonable ask but yeah, it will require a bigger change. Direct methods inherently require there to be an opened client (or, soon to-be opened client), so it isn't that big of an issue there. Deliver once, if there is no ack then fail immediately. Nothing gets queued. |
Hello,
Let's imagine I want only communicate with Device Twin. I have to open connection to communicate with DeviceTwin. It means, behind-the-scenes, I always open such a 'black hole' that swallows all my telemetry messages only because I've opened connection to access to Device Twin. It's not what I expected, it is "side effect", it is not obvious, it obliges me to set a callback even I don't want to listen IoT Hub messages right now.
I'm sorry in advance, maybe I didn't understand your point of view correctly but it sounds for me a little bit as an "override of Is it possible to open a connection but do not consume messages from IoT Hub if MQTT is used? Is it possible to check how it implemented in other SDKs (i.e. Python or C SDK). It would be great to have some consistency among SDKs. |
Thanks for sharing your inputs, @bastyuchenko , we appreciate it.
IoT Hub is a message queue that will store messages for an offline client if the client had previously connected with
I agree that both approaches can have their cons:
Could you explain what you meant by these two statements? I'm not sure if the above explanation satisfies your question here, but if it doesn't, I'd like to understand the question a bit better.
The
The .NET SDK v1 also has this internal message queue in place, similar to the implementation in this PR. |
Thanks for the detailed explanation, @abhipsaMisra
yes, above explanation fully satisfies my question here. @tmahmood-microsoft,
All messages that are stored in "local offline queue" you implemented in this PR will be marked as |
Hi @bastyuchenko, all the messages that are stored in "local offline queue" implemented in this PR will be marked as Completed in IoT Hub. |
iothub/device/src/TransportSettings/IotHubClientMqttSettings.cs
Outdated
Show resolved
Hide resolved
iothub/device/src/TransportSettings/IotHubClientMqttSettings.cs
Outdated
Show resolved
Hide resolved
iothub/device/src/TransportSettings/IotHubClientMqttSettings.cs
Outdated
Show resolved
Hide resolved
iothub/device/src/TransportSettings/IotHubClientMqttSettings.cs
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add some tests for this:
- c2d messages aren't lost when you reconnect with no callback set
- c2d messages are redelivered at a later point when the client subscribes to the callback
/azp run |
Azure Pipelines successfully started running 1 pipeline(s). |
/azp run |
Azure Pipelines successfully started running 1 pipeline(s). |
/azp run |
Azure Pipelines successfully started running 1 pipeline(s). |
Fix for #3335
In v2, if incoming message callback has not been set, messages are still received because cleanSession = false (default) and the client subscribes to topic from last session. This causes messages to be received but the callback has not been set and the message is eventually completed, since MQTT messages cannot be rejected or abandoned.
This PR adds changes where we queue the received messages if callback has not been set and processes them later when callback has been set.