Skip to content
2 changes: 2 additions & 0 deletions packages/stream_chat/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
🐞 Fixed

- Fixed `WebSocket` race condition where reconnection could access null user during disconnect.
- Fixed draft message persistence issues where removed drafts were not properly deleted from the
database.

## 9.14.0

Expand Down
9 changes: 8 additions & 1 deletion packages/stream_chat/lib/src/db/chat_persistence_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,9 @@ abstract class ChatPersistenceClient {
/// Deletes all the members by channel [cids]
Future<void> deleteMembersByCids(List<String> cids);

/// Deletes all the draft messages by channel [cids]
Future<void> deleteDraftMessagesByCids(List<String> cids);

/// Updates the channel [cid] threads data along with reactions and users.
Future<void> updateChannelThreads(
String cid,
Expand Down Expand Up @@ -277,7 +280,6 @@ abstract class ChatPersistenceClient {
final channelWithPinnedMessages = <String, List<Message>?>{};
final channelWithReads = <String, List<Read>?>{};
final channelWithMembers = <String, List<Member>?>{};
final drafts = <Draft>[];

final users = <User>[];
final reactions = <Reaction>[];
Expand All @@ -287,6 +289,9 @@ abstract class ChatPersistenceClient {
final pollVotes = <PollVote>[];
final pollVotesToDelete = <String>[];

final drafts = <Draft>[];
final draftsToDeleteCids = <String>[];

for (final state in channelStates) {
final channel = state.channel;
// Continue if channel is not available.
Expand All @@ -311,6 +316,7 @@ abstract class ChatPersistenceClient {
membersToDelete.add(cid);
reactionsToDelete.addAll(messages?.map((it) => it.id) ?? []);
pinnedReactionsToDelete.addAll(pinnedMessages?.map((it) => it.id) ?? []);
draftsToDeleteCids.add(cid);

// preparing addition data
channelWithReads[cid] = reads;
Expand Down Expand Up @@ -356,6 +362,7 @@ abstract class ChatPersistenceClient {
deleteReactionsByMessageId(reactionsToDelete),
deletePinnedMessageReactionsByMessageId(pinnedReactionsToDelete),
deletePollVotesByPollIds(pollVotesToDelete),
deleteDraftMessagesByCids(draftsToDeleteCids),
]);

// Updating first as does not depend on any other table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ class TestPersistenceClient extends ChatPersistenceClient {
@override
Future<void> deleteMembersByCids(List<String> cids) => Future.value();

@override
Future<void> deleteDraftMessagesByCids(List<String> cids) => Future.value();

@override
Future<void> deleteMessageByCids(List<String> cids) => Future.value();

Expand Down
7 changes: 7 additions & 0 deletions packages/stream_chat_persistence/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## Upcoming

🐞 Fixed

- Fixed draft message retrieval logic where channel drafts were incorrectly attached to all messages
instead of only thread drafts being attached to their respective parent messages.

## 9.14.0

- Updated `stream_chat` dependency to [`9.14.0`](https://pub.dev/packages/stream_chat/changelog).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,9 @@ class DraftMessageDao extends DatabaseAccessor<DriftChatDatabase>

return query.go();
}

/// Deletes all the draft messages by matching [DraftMessages.channelCid]
/// with the given list of [cids].
Future<void> deleteDraftMessagesByCids(List<String> cids) =>
(delete(draftMessages)..where((tbl) => tbl.channelCid.isIn(cids))).go();
}
60 changes: 37 additions & 23 deletions packages/stream_chat_persistence/lib/src/dao/message_dao.dart
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class MessageDao extends DatabaseAccessor<DriftChatDatabase>

Future<Message> _messageFromJoinRow(
TypedResult rows, {
bool fetchDraft = true,
bool fetchDraft = false,
}) async {
final userEntity = rows.readTableOrNull(_users);
final pinnedByEntity = rows.readTableOrNull(_pinnedByUsers);
Expand All @@ -62,7 +62,7 @@ class MessageDao extends DatabaseAccessor<DriftChatDatabase>
final draft = await switch (fetchDraft) {
true => _db.draftMessageDao.getDraftMessageByCid(
msgEntity.channelCid,
parentId: msgEntity.parentId,
parentId: msgEntity.id,
),
_ => null,
};
Expand All @@ -85,19 +85,24 @@ class MessageDao extends DatabaseAccessor<DriftChatDatabase>
Future<Message?> getMessageById(
String id, {
bool fetchDraft = true,
}) async =>
await (select(messages).join(
[
leftOuterJoin(_users, messages.userId.equalsExp(_users.id)),
leftOuterJoin(
_pinnedByUsers,
messages.pinnedByUserId.equalsExp(_pinnedByUsers.id),
),
],
)..where(messages.id.equals(id)))
.map((row) {
return _messageFromJoinRow(row, fetchDraft: fetchDraft);
}).getSingleOrNull();
}) async {
final query = select(messages).join([
leftOuterJoin(_users, messages.userId.equalsExp(_users.id)),
leftOuterJoin(
_pinnedByUsers,
messages.pinnedByUserId.equalsExp(_pinnedByUsers.id),
),
])
..where(messages.id.equals(id));

final result = await query.getSingleOrNull();
if (result == null) return null;

return _messageFromJoinRow(
result,
fetchDraft: fetchDraft,
);
}

/// Returns all the messages of a particular thread by matching
/// [Messages.channelCid] with [cid]
Expand Down Expand Up @@ -163,22 +168,31 @@ class MessageDao extends DatabaseAccessor<DriftChatDatabase>
/// [Messages.channelCid] with [parentId]
Future<List<Message>> getMessagesByCid(
String cid, {
bool fetchDraft = true,
PaginationParams? messagePagination,
}) async {
final msgList = await Future.wait(await (select(messages).join([
final query = select(messages).join([
leftOuterJoin(_users, messages.userId.equalsExp(_users.id)),
leftOuterJoin(
_pinnedByUsers,
messages.pinnedByUserId.equalsExp(_pinnedByUsers.id),
),
])
..where(messages.channelCid.equals(cid))
..where(
messages.parentId.isNull() | messages.showInChannel.equals(true),
)
..orderBy([OrderingTerm.asc(messages.createdAt)]))
.map(_messageFromJoinRow)
.get());
..where(messages.channelCid.equals(cid))
..where(messages.parentId.isNull() | messages.showInChannel.equals(true))
..orderBy([OrderingTerm.asc(messages.createdAt)]);

final result = await query.get();
if (result.isEmpty) return [];

final msgList = await Future.wait(
result.map(
(row) => _messageFromJoinRow(
row,
fetchDraft: fetchDraft,
),
),
);

if (msgList.isNotEmpty) {
if (messagePagination?.lessThan != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@ class PinnedMessageDao extends DatabaseAccessor<DriftChatDatabase>
Future<void> deleteMessageByCids(List<String> cids) async =>
(delete(pinnedMessages)..where((tbl) => tbl.channelCid.isIn(cids))).go();

Future<Message> _messageFromJoinRow(TypedResult rows) async {
final userEntity = rows.readTableOrNull(users);
Future<Message> _messageFromJoinRow(
TypedResult rows, {
bool fetchDraft = false,
}) async {
final userEntity = rows.readTableOrNull(_users);
final pinnedByEntity = rows.readTableOrNull(_pinnedByUsers);
final msgEntity = rows.readTable(pinnedMessages);
final latestReactions =
Expand All @@ -46,38 +49,58 @@ class PinnedMessageDao extends DatabaseAccessor<DriftChatDatabase>
msgEntity.id,
_db.userId,
);
Message? quotedMessage;
final quotedMessageId = msgEntity.quotedMessageId;
if (quotedMessageId != null) {
quotedMessage = await getMessageById(quotedMessageId);
}
Poll? poll;
final pollId = msgEntity.pollId;
if (pollId != null) {
poll = await _db.pollDao.getPollById(pollId);
}

final quotedMessage = await switch (msgEntity.quotedMessageId) {
final id? => getMessageById(id),
_ => null,
};

final poll = await switch (msgEntity.pollId) {
final id? => _db.pollDao.getPollById(id),
_ => null,
};

final draft = await switch (fetchDraft) {
true => _db.draftMessageDao.getDraftMessageByCid(
msgEntity.channelCid,
parentId: msgEntity.id,
),
_ => null,
};

return msgEntity.toMessage(
user: userEntity?.toUser(),
pinnedBy: pinnedByEntity?.toUser(),
latestReactions: latestReactions,
ownReactions: ownReactions,
quotedMessage: quotedMessage,
poll: poll,
draft: draft,
);
}

/// Returns a single message by matching the [PinnedMessages.id] with [id]
Future<Message?> getMessageById(String id) async =>
await (select(pinnedMessages).join([
leftOuterJoin(_users, pinnedMessages.userId.equalsExp(_users.id)),
leftOuterJoin(
_pinnedByUsers,
pinnedMessages.pinnedByUserId.equalsExp(_pinnedByUsers.id),
),
])
..where(pinnedMessages.id.equals(id)))
.map(_messageFromJoinRow)
.getSingleOrNull();
Future<Message?> getMessageById(
String id, {
bool fetchDraft = true,
}) async {
final query = select(pinnedMessages).join([
leftOuterJoin(_users, pinnedMessages.userId.equalsExp(_users.id)),
leftOuterJoin(
_pinnedByUsers,
pinnedMessages.pinnedByUserId.equalsExp(_pinnedByUsers.id),
),
])
..where(pinnedMessages.id.equals(id));

final result = await query.getSingleOrNull();
if (result == null) return null;

return _messageFromJoinRow(
result,
fetchDraft: fetchDraft,
);
}

/// Returns all the messages of a particular thread by matching
/// [PinnedMessages.channelCid] with [cid]
Expand Down Expand Up @@ -142,21 +165,32 @@ class PinnedMessageDao extends DatabaseAccessor<DriftChatDatabase>
/// [PinnedMessages.channelCid] with [parentId]
Future<List<Message>> getMessagesByCid(
String cid, {
bool fetchDraft = true,
PaginationParams? messagePagination,
}) async {
final msgList = await Future.wait(await (select(pinnedMessages).join([
final query = select(pinnedMessages).join([
leftOuterJoin(_users, pinnedMessages.userId.equalsExp(_users.id)),
leftOuterJoin(
_pinnedByUsers,
pinnedMessages.pinnedByUserId.equalsExp(_pinnedByUsers.id),
),
])
..where(pinnedMessages.channelCid.equals(cid))
..where(pinnedMessages.parentId.isNull() |
pinnedMessages.showInChannel.equals(true))
..orderBy([OrderingTerm.asc(pinnedMessages.createdAt)]))
.map(_messageFromJoinRow)
.get());
..where(pinnedMessages.channelCid.equals(cid))
..where(pinnedMessages.parentId.isNull() |
pinnedMessages.showInChannel.equals(true))
..orderBy([OrderingTerm.asc(pinnedMessages.createdAt)]);

final result = await query.get();
if (result.isEmpty) return [];

final msgList = await Future.wait(
result.map(
(row) => _messageFromJoinRow(
row,
fetchDraft: fetchDraft,
),
),
);

if (msgList.isNotEmpty) {
if (messagePagination?.lessThan != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class DriftChatDatabase extends _$DriftChatDatabase {

// you should bump this number whenever you change or add a table definition.
@override
int get schemaVersion => 21;
int get schemaVersion => 22;

@override
MigrationStrategy get migration => MigrationStrategy(
Expand Down
Loading