Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions packages/stream_chat/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## Upcoming

🐞 Fixed

- Fixed `ChannelState.memberCount`, `ChannelState.config` and `ChannelState.extraData` getting reset
on first load.

## 9.17.0

🐞 Fixed
Expand Down
58 changes: 30 additions & 28 deletions packages/stream_chat/lib/src/client/channel.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2107,23 +2107,19 @@ class ChannelClientState {
ChannelClientState(
this._channel,
ChannelState channelState,
) : _debouncedUpdatePersistenceChannelState = debounce(
(ChannelState state) {
final persistenceClient = _channel._client.chatPersistenceClient;
return persistenceClient?.updateChannelState(state);
},
const Duration(seconds: 1),
) {
) {
_retryQueue = RetryQueue(
channel: _channel,
logger: _channel.client.detachedLogger(
'⟳ (${generateHash([_channel.cid])})',
),
);

_checkExpiredAttachmentMessages(channelState);

_channelStateController = BehaviorSubject.seeded(channelState);
// Update the persistence storage with the seeded channel state.
_debouncedUpdatePersistenceChannelState.call([channelState]);

_checkExpiredAttachmentMessages(channelState);

_listenTypingEvents();

Expand Down Expand Up @@ -2199,20 +2195,11 @@ class ChannelClientState {

_listenChannelPushPreferenceUpdated();

_channel._client.chatPersistenceClient
?.getChannelThreads(_channel.cid!)
.then((threads) {
_threads = threads;
}).then((_) {
_channel._client.chatPersistenceClient
?.getChannelStateByCid(_channel.cid!)
.then((state) {
// Replacing the persistence state members with the latest
// `channelState.members` as they may have changes over the time.
updateChannelState(state.copyWith(members: channelState.members));
retryFailedMessages();
});
});
final persistenceClient = _channel.client.chatPersistenceClient;
persistenceClient?.getChannelThreads(_channel.cid!).then((threads) {
// Load all the threads for the channel from the offline storage.
if (threads.isNotEmpty) _threads = threads;
}).then((_) => retryFailedMessages());
}

final Channel _channel;
Expand Down Expand Up @@ -3357,13 +3344,30 @@ class ChannelClientState {
ChannelState get channelState => _channelStateController.value;
late BehaviorSubject<ChannelState> _channelStateController;

final Debounce _debouncedUpdatePersistenceChannelState;
late final _debouncedUpdatePersistenceChannelState = debounce(
(ChannelState state) {
final persistenceClient = _channel._client.chatPersistenceClient;
return persistenceClient?.updateChannelState(state);
},
const Duration(seconds: 1),
);

set _channelState(ChannelState v) {
_channelStateController.safeAdd(v);
_debouncedUpdatePersistenceChannelState.call([v]);
}

late final _debouncedUpdatePersistenceChannelThreads = debounce(
(Map<String, List<Message>> threads) async {
final channelCid = _channel.cid;
if (channelCid == null) return;

final persistenceClient = _channel._client.chatPersistenceClient;
return persistenceClient?.updateChannelThreads(channelCid, threads);
},
const Duration(seconds: 1),
);

/// The channel threads related to this channel.
Map<String, List<Message>> get threads => {..._threadsController.value};

Expand All @@ -3372,10 +3376,7 @@ class ChannelClientState {
final _threadsController = BehaviorSubject.seeded(<String, List<Message>>{});
set _threads(Map<String, List<Message>> threads) {
_threadsController.safeAdd(threads);
_channel.client.chatPersistenceClient?.updateChannelThreads(
_channel.cid!,
threads,
);
_debouncedUpdatePersistenceChannelThreads.call([threads]);
}

/// Clears all the replies in the thread identified by [parentId].
Expand Down Expand Up @@ -3537,6 +3538,7 @@ class ChannelClientState {

/// Call this method to dispose this object.
void dispose() {
_debouncedUpdatePersistenceChannelThreads.cancel();
_debouncedUpdatePersistenceChannelState.cancel();
_retryQueue.dispose();
_subscriptions.cancel();
Expand Down
6 changes: 6 additions & 0 deletions packages/stream_chat/lib/src/db/chat_persistence_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,11 @@ abstract class ChatPersistenceClient {
String cid,
Map<String, List<Message>> threads,
) async {
if (threads.isEmpty) return;

// Flattening the messages from threads
final messages = threads.values.expand((it) => it).toList();
if (messages.isEmpty) return;

// Removing old reactions before saving the new
final oldReactions = messages.map((it) => it.id).toList();
Expand Down Expand Up @@ -276,6 +280,8 @@ abstract class ChatPersistenceClient {

/// Update list of channel states
Future<void> updateChannelStates(List<ChannelState> channelStates) async {
if (channelStates.isEmpty) return;

final reactionsToDelete = <String>[];
final pinnedReactionsToDelete = <String>[];
final membersToDelete = <String>[];
Expand Down
68 changes: 32 additions & 36 deletions packages/stream_chat/test/src/client/client_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ void main() {
final persistentChannelStates = List.generate(
3,
(index) => ChannelState(
channel: ChannelModel(cid: 'p-test-type-$index:p-test-id-$index'),
channel: ChannelModel(cid: 'test-type-$index:test-id-$index'),
),
);

Expand Down Expand Up @@ -636,18 +636,19 @@ void main() {
(_) async => QueryChannelsResponse()..channels = channelStates,
);

when(() => persistence.getChannelThreads(any()))
.thenAnswer((_) async => {});
when(() => persistence.updateChannelThreads(any(), any()))
.thenAnswer((_) async => {});
when(() => persistence.getChannelStateByCid(any(),
messagePagination: any(named: 'messagePagination'),
pinnedMessagePagination:
any(named: 'pinnedMessagePagination'))).thenAnswer(
(invocation) async => ChannelState(
channel: ChannelModel(cid: invocation.positionalArguments.first),
),
when(() => persistence.getChannelThreads(any())).thenAnswer(
(_) async => <String, List<Message>>{
for (final channelState in channelStates)
channelState.channel!.cid: [
Message(id: 'test-message-id', text: 'Test message')
],
},
);

when(() => persistence.updateChannelState(any()))
.thenAnswer((_) async {});
when(() => persistence.updateChannelThreads(any(), any()))
.thenAnswer((_) async {});
when(() => persistence.updateChannelQueries(any(), any(),
clearQueryCache: any(named: 'clearQueryCache')))
.thenAnswer((_) => Future.value());
Expand All @@ -664,7 +665,7 @@ void main() {

// Hack as `teardown` gets called even
// before our stream starts emitting data
await delay(300);
await delay(1050);
Comment on lines -667 to +668
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this can also be done with expectAsync(1/2/3) instead?


verify(() => persistence.getChannelStates(
filter: any(named: 'filter'),
Expand All @@ -684,14 +685,11 @@ void main() {
)).called(1);

verify(() => persistence.getChannelThreads(any()))
.called((persistentChannelStates + channelStates).length);
.called(channelStates.length);
verify(() => persistence.updateChannelState(any()))
.called(channelStates.length);
verify(() => persistence.updateChannelThreads(any(), any()))
.called((persistentChannelStates + channelStates).length);
verify(
() => persistence.getChannelStateByCid(any(),
messagePagination: any(named: 'messagePagination'),
pinnedMessagePagination: any(named: 'pinnedMessagePagination')),
).called((persistentChannelStates + channelStates).length);
.called(channelStates.length);
verify(() => persistence.updateChannelQueries(any(), any(),
clearQueryCache: any(named: 'clearQueryCache'))).called(1);
},
Expand All @@ -703,7 +701,7 @@ void main() {
final persistentChannelStates = List.generate(
3,
(index) => ChannelState(
channel: ChannelModel(cid: 'p-test-type-$index:p-test-id-$index'),
channel: ChannelModel(cid: 'test-type-$index:test-id-$index'),
),
);

Expand All @@ -724,18 +722,19 @@ void main() {
paginationParams: any(named: 'paginationParams'),
)).thenThrow(StreamChatNetworkError(ChatErrorCode.inputError));

when(() => persistence.getChannelThreads(any()))
when(() => persistence.getChannelThreads(any())).thenAnswer(
(_) async => <String, List<Message>>{
for (final channelState in persistentChannelStates)
channelState.channel!.cid: [
Message(id: 'test-message-id', text: 'Test message')
],
},
);

when(() => persistence.updateChannelState(any()))
.thenAnswer((_) async => {});
when(() => persistence.updateChannelThreads(any(), any()))
.thenAnswer((_) async => {});
when(() => persistence.getChannelStateByCid(any(),
messagePagination: any(named: 'messagePagination'),
pinnedMessagePagination:
any(named: 'pinnedMessagePagination'))).thenAnswer(
(invocation) async => ChannelState(
channel: ChannelModel(cid: invocation.positionalArguments.first),
),
);

expectLater(
client.queryChannels(),
Expand All @@ -747,7 +746,7 @@ void main() {

// Hack as `teardown` gets called even
// before our stream starts emitting data
await delay(300);
await delay(1050);

verify(() => persistence.getChannelStates(
filter: any(named: 'filter'),
Expand All @@ -768,13 +767,10 @@ void main() {

verify(() => persistence.getChannelThreads(any()))
.called(persistentChannelStates.length);
verify(() => persistence.updateChannelState(any()))
.called(persistentChannelStates.length);
verify(() => persistence.updateChannelThreads(any(), any()))
.called(persistentChannelStates.length);
verify(
() => persistence.getChannelStateByCid(any(),
messagePagination: any(named: 'messagePagination'),
pinnedMessagePagination: any(named: 'pinnedMessagePagination')),
).called(persistentChannelStates.length);
},
);
});
Expand Down