Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/5-internal/remove-in-queries
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Avoid using IN queries for fetching multiple conversations
47 changes: 20 additions & 27 deletions services/galley/src/Galley/Cassandra/Conversation.hs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ where

import Cassandra hiding (Set)
import qualified Cassandra as Cql
import Control.Error.Util
import Control.Monad.Trans.Maybe
import Data.ByteString.Conversion
import Data.Id
Expand Down Expand Up @@ -176,37 +177,29 @@ conversationGC conv =
lift (deleteConversation (convId conv)) *> mzero
]

localConversation :: ConvId -> Client (Maybe Conversation)
localConversation cid =
UnliftIO.runConcurrently $
toConv cid
<$> UnliftIO.Concurrently (members cid)
<*> UnliftIO.Concurrently (lookupRemoteMembers cid)
<*> UnliftIO.Concurrently (retry x1 $ query1 Cql.selectConv (params LocalQuorum (Identity cid)))

localConversations ::
(Members '[Embed IO, Input ClientState, TinyLog] r) =>
Members '[Embed IO, Input ClientState, TinyLog] r =>
[ConvId] ->
Sem r [Conversation]
localConversations [] = return []
localConversations ids = do
cs <- embedClient $ do
convs <- UnliftIO.async fetchConvs
mems <- UnliftIO.async $ memberLists ids
remoteMems <- UnliftIO.async $ remoteMemberLists ids
zipWith4 toConv ids
<$> UnliftIO.wait mems
<*> UnliftIO.wait remoteMems
<*> UnliftIO.wait convs
foldrM flatten [] (zip ids cs)
localConversations =
(collectAndLog =<<)
. embedClient
. UnliftIO.pooledMapConcurrentlyN 8 localConversation'
where
fetchConvs = do
cs <- retry x1 $ query Cql.selectConvs (params LocalQuorum (Identity ids))
let m =
Map.fromList $
map
( \(cId, cType, uId, access, aRolesFromLegacy, aRoles, name, tId, del, timer, rm, p, gid, mep) ->
(cId, (cType, uId, access, aRolesFromLegacy, aRoles, name, tId, del, timer, rm, p, gid, mep))
)
cs
return $ map (`Map.lookup` m) ids
flatten (i, c) cc = case c of
Nothing -> do
warn $ Log.msg ("No conversation for: " <> toByteString i)
return cc
Just c' -> return (c' : cc)
collectAndLog cs = case partitionEithers cs of
(errs, convs) -> traverse_ (warn . Log.msg) errs $> convs

localConversation' :: ConvId -> Client (Either ByteString Conversation)
localConversation' cid =
note ("No conversation for: " <> toByteString' cid) <$> localConversation cid

-- | Takes a list of conversation ids and returns those found for the given
-- user.
Expand Down
44 changes: 11 additions & 33 deletions services/galley/src/Galley/Cassandra/Conversation/Members.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
module Galley.Cassandra.Conversation.Members
( addMembers,
members,
memberLists,
remoteMemberLists,
lookupRemoteMembers,
removeMembersFromLocalConv,
toMemberStatus,
Expand All @@ -28,10 +26,8 @@ module Galley.Cassandra.Conversation.Members
where

import Cassandra
import Data.Domain
import Data.Id
import qualified Data.List.Extra as List
import qualified Data.Map as Map
import Data.Monoid
import Data.Qualified
import qualified Data.Set as Set
Expand Down Expand Up @@ -117,21 +113,10 @@ removeRemoteMembersFromLocalConv cnv victims = do
for_ victims $ \(qUntagged -> Qualified uid domain) ->
addPrepQuery Cql.removeRemoteMember (cnv, domain, uid)

memberLists :: [ConvId] -> Client [[LocalMember]]
memberLists convs = do
mems <- retry x1 $ query Cql.selectMembers (params LocalQuorum (Identity convs))
let convMembers = foldr (\m acc -> insert (mkMem m) acc) mempty mems
return $ map (\c -> fromMaybe [] (Map.lookup c convMembers)) convs
where
insert (_, Nothing) acc = acc
insert (conv, Just mem) acc =
let f = (Just . maybe [mem] (mem :))
in Map.alter f conv acc
mkMem (cnv, usr, srv, prv, st, omus, omur, oar, oarr, hid, hidr, crn, clients) =
(cnv, toMember (usr, srv, prv, st, omus, omur, oar, oarr, hid, hidr, crn, clients))

members :: ConvId -> Client [LocalMember]
members = fmap concat . memberLists . pure
members conv =
fmap (catMaybes . map toMember) . retry x1 $
query Cql.selectMembers (params LocalQuorum (Identity conv))

toMemberStatus ::
( -- otr muted
Expand Down Expand Up @@ -185,29 +170,22 @@ toMember (usr, srv, prv, Just 0, omus, omur, oar, oarr, hid, hidr, crn, cs) =
}
toMember _ = Nothing

toRemoteMember :: UserId -> Domain -> RoleName -> RemoteMember
toRemoteMember u d = RemoteMember (toRemoteUnsafe d u)

newRemoteMemberWithRole :: Remote (UserId, RoleName) -> RemoteMember
newRemoteMemberWithRole ur@(qUntagged -> (Qualified (u, r) _)) =
RemoteMember
{ rmId = qualifyAs ur u,
rmConvRoleName = r
}

remoteMemberLists :: [ConvId] -> Client [[RemoteMember]]
remoteMemberLists convs = do
mems <- retry x1 $ query Cql.selectRemoteMembers (params LocalQuorum (Identity convs))
let convMembers = foldr (insert . mkMem) Map.empty mems
return $ map (\c -> fromMaybe [] (Map.lookup c convMembers)) convs
where
insert (conv, mem) acc =
let f = (Just . maybe [mem] (mem :))
in Map.alter f conv acc
mkMem (cnv, domain, usr, role) = (cnv, toRemoteMember usr domain role)

lookupRemoteMembers :: ConvId -> Client [RemoteMember]
lookupRemoteMembers conv = join <$> remoteMemberLists [conv]
lookupRemoteMembers conv = do
fmap (map mkMem) . retry x1 $ query Cql.selectRemoteMembers (params LocalQuorum (Identity conv))
where
mkMem (domain, usr, role) =
RemoteMember
{ rmId = toRemoteUnsafe domain usr,
rmConvRoleName = role
}

member ::
ConvId ->
Expand Down
11 changes: 4 additions & 7 deletions services/galley/src/Galley/Cassandra/Queries.hs
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,6 @@ updateTeamStatus = "update team set status = ? where team = ?"
selectConv :: PrepQuery R (Identity ConvId) (ConvType, UserId, Maybe (C.Set Access), Maybe AccessRoleLegacy, Maybe (C.Set AccessRoleV2), Maybe Text, Maybe TeamId, Maybe Bool, Maybe Milliseconds, Maybe ReceiptMode, Maybe ProtocolTag, Maybe GroupId, Maybe Epoch)
selectConv = "select type, creator, access, access_role, access_roles_v2, name, team, deleted, message_timer, receipt_mode, protocol, group_id, epoch from conversation where conv = ?"

selectConvs :: PrepQuery R (Identity [ConvId]) (ConvId, ConvType, UserId, Maybe (C.Set Access), Maybe AccessRoleLegacy, Maybe (C.Set AccessRoleV2), Maybe Text, Maybe TeamId, Maybe Bool, Maybe Milliseconds, Maybe ReceiptMode, Maybe ProtocolTag, Maybe GroupId, Maybe Epoch)
selectConvs = "select conv, type, creator, access, access_role, access_roles_v2, name, team, deleted, message_timer, receipt_mode, protocol, group_id, epoch from conversation where conv in ?"

selectReceiptMode :: PrepQuery R (Identity ConvId) (Identity (Maybe ReceiptMode))
selectReceiptMode = "select receipt_mode from conversation where conv = ?"

Expand Down Expand Up @@ -272,8 +269,8 @@ type MemberStatus = Int32
selectMember :: PrepQuery R (ConvId, UserId) (UserId, Maybe ServiceId, Maybe ProviderId, Maybe MemberStatus, Maybe MutedStatus, Maybe Text, Maybe Bool, Maybe Text, Maybe Bool, Maybe Text, Maybe RoleName, Maybe (C.Set ClientId))
selectMember = "select user, service, provider, status, otr_muted_status, otr_muted_ref, otr_archived, otr_archived_ref, hidden, hidden_ref, conversation_role, mls_clients from member where conv = ? and user = ?"

selectMembers :: PrepQuery R (Identity [ConvId]) (ConvId, UserId, Maybe ServiceId, Maybe ProviderId, Maybe MemberStatus, Maybe MutedStatus, Maybe Text, Maybe Bool, Maybe Text, Maybe Bool, Maybe Text, Maybe RoleName, Maybe (C.Set ClientId))
selectMembers = "select conv, user, service, provider, status, otr_muted_status, otr_muted_ref, otr_archived, otr_archived_ref, hidden, hidden_ref, conversation_role, mls_clients from member where conv in ?"
selectMembers :: PrepQuery R (Identity ConvId) (UserId, Maybe ServiceId, Maybe ProviderId, Maybe MemberStatus, Maybe MutedStatus, Maybe Text, Maybe Bool, Maybe Text, Maybe Bool, Maybe Text, Maybe RoleName, Maybe (C.Set ClientId))
selectMembers = "select user, service, provider, status, otr_muted_status, otr_muted_ref, otr_archived, otr_archived_ref, hidden, hidden_ref, conversation_role, mls_clients from member where conv = ?"

insertMember :: PrepQuery W (ConvId, UserId, Maybe ServiceId, Maybe ProviderId, RoleName, Maybe (C.Set ClientId)) ()
insertMember = "insert into member (conv, user, service, provider, status, conversation_role, mls_clients) values (?, ?, ?, ?, 0, ?, ?)"
Expand Down Expand Up @@ -305,8 +302,8 @@ insertRemoteMember = "insert into member_remote_user (conv, user_remote_domain,
removeRemoteMember :: PrepQuery W (ConvId, Domain, UserId) ()
removeRemoteMember = "delete from member_remote_user where conv = ? and user_remote_domain = ? and user_remote_id = ?"

selectRemoteMembers :: PrepQuery R (Identity [ConvId]) (ConvId, Domain, UserId, RoleName)
selectRemoteMembers = "select conv, user_remote_domain, user_remote_id, conversation_role from member_remote_user where conv in ?"
selectRemoteMembers :: PrepQuery R (Identity ConvId) (Domain, UserId, RoleName)
selectRemoteMembers = "select user_remote_domain, user_remote_id, conversation_role from member_remote_user where conv = ?"

updateRemoteMemberConvRoleName :: PrepQuery W (RoleName, ConvId, Domain, UserId) ()
updateRemoteMemberConvRoleName = "update member_remote_user set conversation_role = ? where conv = ? and user_remote_domain = ? and user_remote_id = ?"
Expand Down