Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/stream_chat/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

- Fixed cached messages are cleared from channels with unread messages when accessed
offline. [[#2083]](https://github.com/GetStream/stream-chat-flutter/issues/2083)
- Fixed RetryQueue skipping messages due to premature removal from the
queue. [[#2308]](https://github.com/GetStream/stream-chat-flutter/pull/2308)

✅ Added

Expand Down
48 changes: 17 additions & 31 deletions packages/stream_chat/lib/src/client/retry_queue.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import 'dart:async';

import 'package:collection/collection.dart';
import 'package:rxdart/rxdart.dart';
import 'package:stream_chat/src/client/retry_policy.dart';
import 'package:stream_chat/stream_chat.dart';

/// The retry queue associated to a channel.
Expand All @@ -14,7 +13,6 @@ class RetryQueue {
}) : client = channel.client {
_retryPolicy = client.retryPolicy;
_listenConnectionRecovered();
_listenMessageEvents();
}

/// The channel of this queue.
Expand All @@ -41,19 +39,6 @@ class RetryQueue {
}).addTo(_compositeSubscription);
}

void _listenMessageEvents() {
channel.on().where((event) => event.message != null).listen((event) {
final message = event.message!;
final containsMessage = _messageQueue.containsMessage(message);
if (!containsMessage) return;

if (message.state.isCompleted) {
logger?.info('Removing sent message from queue : ${message.id}');
return _messageQueue.removeMessage(message);
}
}).addTo(_compositeSubscription);
}

/// Add a list of messages.
void add(Iterable<Message> messages) {
assert(
Expand All @@ -62,9 +47,7 @@ class RetryQueue {
);

// Filter out messages that are already in the queue.
final messagesToAdd = messages.where((it) {
return !_messageQueue.containsMessage(it);
});
final messagesToAdd = messages.whereNot(_messageQueue.containsMessage);

// If there are no messages to add, return.
if (messagesToAdd.isEmpty) return;
Expand Down Expand Up @@ -106,7 +89,7 @@ class RetryQueue {
channel.state?.updateMessage(message);
} finally {
// remove the message from the queue after it's handled.
_messageQueue.removeFirst();
_messageQueue.removeMessage(message);
}
}

Expand All @@ -130,8 +113,8 @@ class RetryQueue {
final date2 = _getMessageDate(m2);

if (date1 == null && date2 == null) return 0;
if (date1 == null) return -1;
if (date2 == null) return 1;
if (date1 == null) return 1;
if (date2 == null) return -1;
return date1.compareTo(date2);
}

Expand All @@ -148,18 +131,21 @@ class RetryQueue {
}

extension on HeapPriorityQueue<Message> {
void removeMessage(Message message) {
final list = toUnorderedList();
final index = list.indexWhere((it) => it.id == message.id);
if (index == -1) return;
final element = list[index];
remove(element);
bool removeMessage(Message message) {
for (final element in unorderedElements) {
if (element.id == message.id) {
return remove(element);
}
}

return false;
}

bool containsMessage(Message message) {
final list = toUnorderedList();
final index = list.indexWhere((it) => it.id == message.id);
if (index == -1) return false;
return true;
for (final element in unorderedElements) {
if (element.id == message.id) return true;
}

return false;
}
}
1 change: 1 addition & 0 deletions packages/stream_chat/lib/stream_chat.dart
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export 'package:uuid/uuid.dart';
export 'src/client/channel.dart';
export 'src/client/client.dart';
export 'src/client/key_stroke_handler.dart';
export 'src/client/retry_policy.dart';
export 'src/core/api/attachment_file_uploader.dart';
export 'src/core/api/requests.dart';
export 'src/core/api/responses.dart';
Expand Down
1 change: 0 additions & 1 deletion packages/stream_chat/test/src/client/channel_test.dart
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// ignore_for_file: lines_longer_than_80_chars

import 'package:mocktail/mocktail.dart';
import 'package:stream_chat/src/client/retry_policy.dart';
import 'package:stream_chat/stream_chat.dart';
import 'package:test/test.dart';

Expand Down
1 change: 0 additions & 1 deletion packages/stream_chat/test/src/client/retry_queue_test.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import 'package:mocktail/mocktail.dart';
import 'package:stream_chat/src/client/retry_policy.dart';
import 'package:stream_chat/src/client/retry_queue.dart';
import 'package:stream_chat/stream_chat.dart';
import 'package:test/test.dart';
Expand Down
6 changes: 6 additions & 0 deletions sample_app/lib/app.dart
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,12 @@ StreamChatClient buildStreamChatClient(String apiKey) {
apiKey,
logLevel: logLevel,
logHandlerFunction: _sampleAppLogHandler,
retryPolicy: RetryPolicy(
maxRetryAttempts: 3,
shouldRetry: (client, attempt, error) {
return error is StreamChatNetworkError && error.isRetriable;
},
),
//baseURL: 'http://<local-ip>:3030',
//baseWsUrl: 'ws://<local-ip>:8800',
)..chatPersistenceClient = chatPersistentClient;
Expand Down