Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 17 additions & 30 deletions packages/stream_chat/lib/src/client/retry_queue.dart
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ class RetryQueue {
}) : client = channel.client {
_retryPolicy = client.retryPolicy;
_listenConnectionRecovered();
_listenMessageEvents();
}

/// The channel of this queue.
Expand All @@ -41,19 +40,6 @@ class RetryQueue {
}).addTo(_compositeSubscription);
}

void _listenMessageEvents() {
channel.on().where((event) => event.message != null).listen((event) {
final message = event.message!;
final containsMessage = _messageQueue.containsMessage(message);
if (!containsMessage) return;

if (message.state.isCompleted) {
logger?.info('Removing sent message from queue : ${message.id}');
return _messageQueue.removeMessage(message);
}
}).addTo(_compositeSubscription);
}

/// Add a list of messages.
void add(Iterable<Message> messages) {
assert(
Expand All @@ -62,9 +48,7 @@ class RetryQueue {
);

// Filter out messages that are already in the queue.
final messagesToAdd = messages.where((it) {
return !_messageQueue.containsMessage(it);
});
final messagesToAdd = messages.whereNot(_messageQueue.containsMessage);

// If there are no messages to add, return.
if (messagesToAdd.isEmpty) return;
Expand Down Expand Up @@ -106,7 +90,7 @@ class RetryQueue {
channel.state?.updateMessage(message);
} finally {
// remove the message from the queue after it's handled.
_messageQueue.removeFirst();
_messageQueue.removeMessage(message);
}
}

Expand All @@ -130,8 +114,8 @@ class RetryQueue {
final date2 = _getMessageDate(m2);

if (date1 == null && date2 == null) return 0;
if (date1 == null) return -1;
if (date2 == null) return 1;
if (date1 == null) return 1;
if (date2 == null) return -1;
return date1.compareTo(date2);
}

Expand All @@ -148,18 +132,21 @@ class RetryQueue {
}

extension on HeapPriorityQueue<Message> {
void removeMessage(Message message) {
final list = toUnorderedList();
final index = list.indexWhere((it) => it.id == message.id);
if (index == -1) return;
final element = list[index];
remove(element);
bool removeMessage(Message message) {
for (final element in unorderedElements) {
if (element.id == message.id) {
return remove(element);
}
}

return false;
}

bool containsMessage(Message message) {
final list = toUnorderedList();
final index = list.indexWhere((it) => it.id == message.id);
if (index == -1) return false;
return true;
for (final element in unorderedElements) {
if (element.id == message.id) return true;
}

return false;
}
}
Loading