From d2863726e3a0146c21fe13cd393336018390ae33 Mon Sep 17 00:00:00 2001 From: Sahil Kumar Date: Thu, 17 Jul 2025 02:04:37 +0200 Subject: [PATCH 1/3] fix(llc): fix skipped message retries due to premature queue removal MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, the RetryQueue listened to channel message events and removed messages from the queue when they were successfully sent. This introduced a race condition: when the retry loop reached `finally`, it would call `removeFirst()` assuming the message was still in the queue — but the event listener had already removed it. As a result, the next (unrelated) message would be incorrectly removed and skipped. This change removes the `_listenMessageEvents()` logic and ensures that message removal is done explicitly inside `_processQueue()` using `removeMessage(message)`. This guarantees that only the message currently being retried is removed. Additionally: - Optimized `add()` to filter out duplicates more efficiently. - Improved `containsMessage` and `removeMessage` using `unorderedElements`. - Fixed `_byDate()` comparator to ensure null dates are sorted to the bottom. --- .../lib/src/client/retry_queue.dart | 47 +++++++------------ 1 file changed, 17 insertions(+), 30 deletions(-) diff --git a/packages/stream_chat/lib/src/client/retry_queue.dart b/packages/stream_chat/lib/src/client/retry_queue.dart index 43ad314d5b..0212b5384a 100644 --- a/packages/stream_chat/lib/src/client/retry_queue.dart +++ b/packages/stream_chat/lib/src/client/retry_queue.dart @@ -14,7 +14,6 @@ class RetryQueue { }) : client = channel.client { _retryPolicy = client.retryPolicy; _listenConnectionRecovered(); - _listenMessageEvents(); } /// The channel of this queue. @@ -41,19 +40,6 @@ class RetryQueue { }).addTo(_compositeSubscription); } - void _listenMessageEvents() { - channel.on().where((event) => event.message != null).listen((event) { - final message = event.message!; - final containsMessage = _messageQueue.containsMessage(message); - if (!containsMessage) return; - - if (message.state.isCompleted) { - logger?.info('Removing sent message from queue : ${message.id}'); - return _messageQueue.removeMessage(message); - } - }).addTo(_compositeSubscription); - } - /// Add a list of messages. void add(Iterable messages) { assert( @@ -62,9 +48,7 @@ class RetryQueue { ); // Filter out messages that are already in the queue. - final messagesToAdd = messages.where((it) { - return !_messageQueue.containsMessage(it); - }); + final messagesToAdd = messages.whereNot(_messageQueue.containsMessage); // If there are no messages to add, return. if (messagesToAdd.isEmpty) return; @@ -106,7 +90,7 @@ class RetryQueue { channel.state?.updateMessage(message); } finally { // remove the message from the queue after it's handled. - _messageQueue.removeFirst(); + _messageQueue.removeMessage(message); } } @@ -130,8 +114,8 @@ class RetryQueue { final date2 = _getMessageDate(m2); if (date1 == null && date2 == null) return 0; - if (date1 == null) return -1; - if (date2 == null) return 1; + if (date1 == null) return 1; + if (date2 == null) return -1; return date1.compareTo(date2); } @@ -148,18 +132,21 @@ class RetryQueue { } extension on HeapPriorityQueue { - void removeMessage(Message message) { - final list = toUnorderedList(); - final index = list.indexWhere((it) => it.id == message.id); - if (index == -1) return; - final element = list[index]; - remove(element); + bool removeMessage(Message message) { + for (final element in unorderedElements) { + if (element.id == message.id) { + return remove(element); + } + } + + return false; } bool containsMessage(Message message) { - final list = toUnorderedList(); - final index = list.indexWhere((it) => it.id == message.id); - if (index == -1) return false; - return true; + for (final element in unorderedElements) { + if (element.id == message.id) return true; + } + + return false; } } From 56ecffa24dfb4523bc1c5e0b92eddee76844df13 Mon Sep 17 00:00:00 2001 From: Sahil Kumar Date: Thu, 17 Jul 2025 02:08:17 +0200 Subject: [PATCH 2/3] chore: update CHANGELOG.md --- packages/stream_chat/CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/stream_chat/CHANGELOG.md b/packages/stream_chat/CHANGELOG.md index 5ab11ff624..3d4ffc6f21 100644 --- a/packages/stream_chat/CHANGELOG.md +++ b/packages/stream_chat/CHANGELOG.md @@ -4,6 +4,8 @@ - Fixed cached messages are cleared from channels with unread messages when accessed offline. [[#2083]](https://github.com/GetStream/stream-chat-flutter/issues/2083) +- Fixed RetryQueue skipping messages due to premature removal from the + queue. [[#2308]](https://github.com/GetStream/stream-chat-flutter/pull/2308) ✅ Added From dc7dc1577a1513b2e2eaf5bc98ba532743323a1c Mon Sep 17 00:00:00 2001 From: Sahil Kumar Date: Thu, 17 Jul 2025 13:26:45 +0200 Subject: [PATCH 3/3] chore: export retry_policy --- packages/stream_chat/lib/src/client/retry_queue.dart | 1 - packages/stream_chat/lib/stream_chat.dart | 1 + packages/stream_chat/test/src/client/channel_test.dart | 1 - packages/stream_chat/test/src/client/retry_queue_test.dart | 1 - sample_app/lib/app.dart | 6 ++++++ 5 files changed, 7 insertions(+), 3 deletions(-) diff --git a/packages/stream_chat/lib/src/client/retry_queue.dart b/packages/stream_chat/lib/src/client/retry_queue.dart index 0212b5384a..8675ee68b7 100644 --- a/packages/stream_chat/lib/src/client/retry_queue.dart +++ b/packages/stream_chat/lib/src/client/retry_queue.dart @@ -2,7 +2,6 @@ import 'dart:async'; import 'package:collection/collection.dart'; import 'package:rxdart/rxdart.dart'; -import 'package:stream_chat/src/client/retry_policy.dart'; import 'package:stream_chat/stream_chat.dart'; /// The retry queue associated to a channel. diff --git a/packages/stream_chat/lib/stream_chat.dart b/packages/stream_chat/lib/stream_chat.dart index 1a6b1844d7..f296711b75 100644 --- a/packages/stream_chat/lib/stream_chat.dart +++ b/packages/stream_chat/lib/stream_chat.dart @@ -19,6 +19,7 @@ export 'package:uuid/uuid.dart'; export 'src/client/channel.dart'; export 'src/client/client.dart'; export 'src/client/key_stroke_handler.dart'; +export 'src/client/retry_policy.dart'; export 'src/core/api/attachment_file_uploader.dart'; export 'src/core/api/requests.dart'; export 'src/core/api/responses.dart'; diff --git a/packages/stream_chat/test/src/client/channel_test.dart b/packages/stream_chat/test/src/client/channel_test.dart index fb011ded70..eabb98f7c2 100644 --- a/packages/stream_chat/test/src/client/channel_test.dart +++ b/packages/stream_chat/test/src/client/channel_test.dart @@ -1,7 +1,6 @@ // ignore_for_file: lines_longer_than_80_chars import 'package:mocktail/mocktail.dart'; -import 'package:stream_chat/src/client/retry_policy.dart'; import 'package:stream_chat/stream_chat.dart'; import 'package:test/test.dart'; diff --git a/packages/stream_chat/test/src/client/retry_queue_test.dart b/packages/stream_chat/test/src/client/retry_queue_test.dart index befa28f3bd..1150c40b85 100644 --- a/packages/stream_chat/test/src/client/retry_queue_test.dart +++ b/packages/stream_chat/test/src/client/retry_queue_test.dart @@ -1,5 +1,4 @@ import 'package:mocktail/mocktail.dart'; -import 'package:stream_chat/src/client/retry_policy.dart'; import 'package:stream_chat/src/client/retry_queue.dart'; import 'package:stream_chat/stream_chat.dart'; import 'package:test/test.dart'; diff --git a/sample_app/lib/app.dart b/sample_app/lib/app.dart index 5e72b30825..2beadbcc93 100644 --- a/sample_app/lib/app.dart +++ b/sample_app/lib/app.dart @@ -265,6 +265,12 @@ StreamChatClient buildStreamChatClient(String apiKey) { apiKey, logLevel: logLevel, logHandlerFunction: _sampleAppLogHandler, + retryPolicy: RetryPolicy( + maxRetryAttempts: 3, + shouldRetry: (client, attempt, error) { + return error is StreamChatNetworkError && error.isRetriable; + }, + ), //baseURL: 'http://:3030', //baseWsUrl: 'ws://:8800', )..chatPersistenceClient = chatPersistentClient;