diff --git a/changelog.d/6-federation/parallel-rpcs b/changelog.d/6-federation/parallel-rpcs new file mode 100644 index 0000000000..53d9fb8d3f --- /dev/null +++ b/changelog.d/6-federation/parallel-rpcs @@ -0,0 +1 @@ +Make federated requests to multiple backends in parallel. diff --git a/libs/types-common/src/Data/Qualified.hs b/libs/types-common/src/Data/Qualified.hs index a1b9209d2b..4bce70e078 100644 --- a/libs/types-common/src/Data/Qualified.hs +++ b/libs/types-common/src/Data/Qualified.hs @@ -22,6 +22,7 @@ module Data.Qualified ( -- * Qualified Qualified (..), + qToPair, QualifiedWithTag, tUnqualified, tUnqualifiedL, @@ -35,15 +36,17 @@ module Data.Qualified qualifyAs, foldQualified, partitionQualified, + partitionQualifiedAndTag, indexQualified, bucketQualified, - indexRemote, + bucketRemote, deprecatedSchema, ) where import Control.Lens (Lens, lens, (?~)) import Data.Aeson (FromJSON (..), ToJSON (..)) +import Data.Bifunctor (first) import Data.Domain (Domain) import Data.Handle (Handle (..)) import Data.Id @@ -62,6 +65,9 @@ data Qualified a = Qualified } deriving stock (Eq, Ord, Show, Generic, Functor, Foldable, Traversable) +qToPair :: Qualified a -> (Domain, a) +qToPair (Qualified x dom) = (dom, x) + data QTag = QLocal | QRemote deriving (Eq, Show) @@ -125,6 +131,11 @@ partitionQualified loc = foldMap $ foldQualified loc (\l -> ([tUnqualified l], mempty)) (\r -> (mempty, [r])) +partitionQualifiedAndTag :: Foldable f => Local x -> f (Qualified a) -> ([Local a], [Remote a]) +partitionQualifiedAndTag loc = + first (map (qualifyAs loc)) + . partitionQualified loc + -- | Index a list of qualified values by domain. indexQualified :: Foldable f => f (Qualified a) -> Map Domain [a] indexQualified = foldr add mempty @@ -136,9 +147,8 @@ indexQualified = foldr add mempty bucketQualified :: Foldable f => f (Qualified a) -> [Qualified [a]] bucketQualified = map (\(d, a) -> Qualified a d) . Map.assocs . indexQualified --- FUTUREWORK: Rename this to 'bucketRemote' -indexRemote :: (Functor f, Foldable f) => f (Remote a) -> [Remote [a]] -indexRemote = +bucketRemote :: (Functor f, Foldable f) => f (Remote a) -> [Remote [a]] +bucketRemote = map (uncurry toRemoteUnsafe) . Map.assocs . indexQualified diff --git a/libs/wire-api-federation/src/Wire/API/Federation/Client.hs b/libs/wire-api-federation/src/Wire/API/Federation/Client.hs index ee39c5ed59..423c7788a5 100644 --- a/libs/wire-api-federation/src/Wire/API/Federation/Client.hs +++ b/libs/wire-api-federation/src/Wire/API/Federation/Client.hs @@ -123,14 +123,16 @@ data FederationError | FederationNotImplemented | FederationNotConfigured | FederationCallFailure FederationClientFailure - deriving (Show, Eq) + deriving (Show, Eq, Typeable) + +instance Exception FederationError data FederationClientFailure = FederationClientFailure { fedFailDomain :: Domain, fedFailPath :: ByteString, fedFailError :: FederationClientError } - deriving (Show, Eq) + deriving (Show, Eq, Typeable) data FederationClientError = FederationClientInvalidMethod HTTP.Method @@ -139,7 +141,7 @@ data FederationClientError | FederationClientOutwardError Proto.OutwardError | FederationClientInwardError Proto.InwardError | FederationClientServantError Servant.ClientError - deriving (Show, Eq) + deriving (Show, Eq, Typeable) callRemote :: MonadIO m => GrpcClient -> Proto.ValidatedFederatedRequest -> m (GRpcReply Proto.OutwardResponse) callRemote fedClient call = liftIO $ gRpcCall @'MsgProtoBuf @Proto.Outward @"Outward" @"call" fedClient (Proto.validatedFederatedRequestToFederatedRequest call) diff --git a/libs/wire-api/src/Wire/API/Team/LegalHold.hs b/libs/wire-api/src/Wire/API/Team/LegalHold.hs index b03769225e..7dd7fc7099 100644 --- a/libs/wire-api/src/Wire/API/Team/LegalHold.hs +++ b/libs/wire-api/src/Wire/API/Team/LegalHold.hs @@ -350,7 +350,7 @@ data LegalholdProtectee | -- | add UserId here if you want to protect bots as well (or just remove and use -- 'ProtectedUser', but then you'll loose the user type information). UnprotectedBot - | -- | FUTUREWORK: protection against legalhold when looking up prekeys accross federated + | -- | FUTUREWORK: protection against legalhold when looking up prekeys across federated -- instances. LegalholdPlusFederationNotImplemented deriving (Show, Eq, Ord, Generic) diff --git a/libs/wire-api/src/Wire/API/User/Client.hs b/libs/wire-api/src/Wire/API/User/Client.hs index 58f3b573d2..6e601ecbb8 100644 --- a/libs/wire-api/src/Wire/API/User/Client.hs +++ b/libs/wire-api/src/Wire/API/User/Client.hs @@ -30,6 +30,7 @@ module Wire.API.User.Client QualifiedUserClientMap (..), QualifiedUserClientPrekeyMap (..), mkQualifiedUserClientPrekeyMap, + qualifiedUserClientPrekeyMapFromList, UserClientsFull (..), userClientsFullToUserClients, UserClients (..), @@ -84,6 +85,7 @@ import Data.Id import Data.Json.Util import qualified Data.Map.Strict as Map import Data.Misc (Latitude (..), Location, Longitude (..), PlainTextPassword (..), latitude, location, longitude, modelLocation) +import Data.Qualified import Data.Schema import qualified Data.Semigroup as Semigroup import qualified Data.Set as Set @@ -308,6 +310,12 @@ instance ToSchema QualifiedUserClientPrekeyMap where mkQualifiedUserClientPrekeyMap :: Map Domain UserClientPrekeyMap -> QualifiedUserClientPrekeyMap mkQualifiedUserClientPrekeyMap = coerce +qualifiedUserClientPrekeyMapFromList :: + [Qualified UserClientPrekeyMap] -> + QualifiedUserClientPrekeyMap +qualifiedUserClientPrekeyMapFromList = + mkQualifiedUserClientPrekeyMap . Map.fromList . map qToPair + -------------------------------------------------------------------------------- -- UserClients diff --git a/services/brig/src/Brig/API/Client.hs b/services/brig/src/Brig/API/Client.hs index 024f27fb14..e601a81ecf 100644 --- a/services/brig/src/Brig/API/Client.hs +++ b/services/brig/src/Brig/API/Client.hs @@ -45,6 +45,7 @@ module Brig.API.Client where import Brig.API.Types +import Brig.API.Util import Brig.App import qualified Brig.Data.Client as Data import qualified Brig.Data.User as Data @@ -71,17 +72,16 @@ import qualified Data.Map.Strict as Map import Data.Misc (PlainTextPassword (..)) import Data.Qualified import qualified Data.Set as Set -import Galley.Types (UserClients (..)) import Imports import Network.Wai.Utilities import System.Logger.Class (field, msg, val, (~~)) import qualified System.Logger.Class as Log import UnliftIO.Async (Concurrently (Concurrently, runConcurrently)) import Wire.API.Federation.API.Brig (GetUserClients (GetUserClients)) +import Wire.API.Federation.Client (FederationError (..)) import qualified Wire.API.Message as Message import Wire.API.Team.LegalHold (LegalholdProtectee (..)) -import Wire.API.User.Client (ClientCapabilityList (..), QualifiedUserClientPrekeyMap (..), QualifiedUserClients (..), UserClientPrekeyMap, mkQualifiedUserClientPrekeyMap, mkUserClientPrekeyMap) -import qualified Wire.API.User.Client as Client +import Wire.API.User.Client import Wire.API.UserMap (QualifiedUserMap (QualifiedUserMap, qualifiedUserMap), UserMap (userMap)) lookupLocalClient :: UserId -> ClientId -> AppIO (Maybe Client) @@ -126,14 +126,14 @@ addClient u con ip new = do acc <- lift (Data.lookupAccount u) >>= maybe (throwE (ClientUserNotFound u)) return loc <- maybe (return Nothing) locationOf ip maxPermClients <- fromMaybe Opt.defUserMaxPermClients <$> Opt.setUserMaxPermClients <$> view settings - let caps :: Maybe (Set Client.ClientCapability) + let caps :: Maybe (Set ClientCapability) caps = updlhdev $ newClientCapabilities new where updlhdev = if newClientType new == LegalHoldClientType then Just . maybe (Set.singleton lhcaps) (Set.insert lhcaps) else id - lhcaps = Client.ClientSupportsLegalholdImplicitConsent + lhcaps = ClientSupportsLegalholdImplicitConsent (clt, old, count) <- Data.addClient u clientId' new maxPermClients loc caps !>> ClientDataError let usr = accountUser acc lift $ do @@ -186,7 +186,7 @@ claimPrekey protectee u d c = do claimLocalPrekey :: LegalholdProtectee -> UserId -> ClientId -> ExceptT ClientError AppIO (Maybe ClientPrekey) claimLocalPrekey protectee user client = do - guardLegalhold protectee (Client.mkUserClients [(user, [client])]) + guardLegalhold protectee (mkUserClients [(user, [client])]) lift $ do prekey <- Data.claimPrekey user client when (isNothing prekey) (noPrekeys user client) @@ -205,7 +205,7 @@ claimPrekeyBundle protectee domain uid = do claimLocalPrekeyBundle :: LegalholdProtectee -> UserId -> ExceptT ClientError AppIO PrekeyBundle claimLocalPrekeyBundle protectee u = do clients <- map clientId <$> Data.lookupClients u - guardLegalhold protectee (Client.mkUserClients [(u, clients)]) + guardLegalhold protectee (mkUserClients [(u, clients)]) PrekeyBundle u . catMaybes <$> lift (mapM (Data.claimPrekey u) clients) claimRemotePrekeyBundle :: Qualified UserId -> ExceptT ClientError AppIO PrekeyBundle @@ -214,18 +214,33 @@ claimRemotePrekeyBundle quser = do claimMultiPrekeyBundles :: LegalholdProtectee -> QualifiedUserClients -> ExceptT ClientError AppIO QualifiedUserClientPrekeyMap claimMultiPrekeyBundles protectee quc = do - localDomain <- viewFederationDomain - fmap (mkQualifiedUserClientPrekeyMap . Map.fromList) - -- FUTUREWORK(federation): parallelise federator requests here - . traverse (\(domain, uc) -> (domain,) <$> claim localDomain domain (UserClients uc)) - . Map.assocs - . qualifiedUserClients - $ quc + loc <- qualifyLocal () + let (locals, remotes) = + partitionQualifiedAndTag + loc + ( map + (fmap UserClients . uncurry (flip Qualified)) + (Map.assocs (qualifiedUserClients quc)) + ) + localPrekeys <- traverse claimLocal locals + remotePrekeys <- + traverseConcurrentlyWithErrors + claimRemote + remotes + !>> ClientFederationError + pure . qualifiedUserClientPrekeyMapFromList $ localPrekeys <> remotePrekeys where - claim :: Domain -> Domain -> UserClients -> ExceptT ClientError AppIO UserClientPrekeyMap - claim localDomain domain uc - | domain == localDomain = claimLocalMultiPrekeyBundles protectee uc - | otherwise = Federation.claimMultiPrekeyBundle domain uc !>> ClientFederationError + claimRemote :: + Remote UserClients -> + ExceptT FederationError AppIO (Qualified UserClientPrekeyMap) + claimRemote ruc = + qUntagged . qualifyAs ruc + <$> Federation.claimMultiPrekeyBundle (tDomain ruc) (tUnqualified ruc) + + claimLocal :: Local UserClients -> ExceptT ClientError AppIO (Qualified UserClientPrekeyMap) + claimLocal luc = + qUntagged . qualifyAs luc + <$> claimLocalMultiPrekeyBundles protectee (tUnqualified luc) claimLocalMultiPrekeyBundles :: LegalholdProtectee -> UserClients -> ExceptT ClientError AppIO UserClientPrekeyMap claimLocalMultiPrekeyBundles protectee userClients = do diff --git a/services/brig/src/Brig/API/Public.hs b/services/brig/src/Brig/API/Public.hs index 809a0e1f4d..3ed826d088 100644 --- a/services/brig/src/Brig/API/Public.hs +++ b/services/brig/src/Brig/API/Public.hs @@ -918,7 +918,9 @@ getUserUnqualifiedH self uid = do getUser self (Qualified uid domain) getUser :: UserId -> Qualified UserId -> Handler (Maybe Public.UserProfile) -getUser self qualifiedUserId = API.lookupProfile self qualifiedUserId !>> fedError +getUser self qualifiedUserId = do + lself <- qualifyLocal self + API.lookupProfile lself qualifiedUserId !>> fedError getUserDisplayNameH :: JSON ::: UserId -> Handler Response getUserDisplayNameH (_ ::: self) = do @@ -946,14 +948,14 @@ listUsersByUnqualifiedIdsOrHandles self mUids mHandles = do listUsersByIdsOrHandles :: UserId -> Public.ListUsersQuery -> Handler [Public.UserProfile] listUsersByIdsOrHandles self q = do + lself <- qualifyLocal self foundUsers <- case q of Public.ListUsersByIds us -> - byIds us + byIds lself us Public.ListUsersByHandles hs -> do - loc <- qualifyLocal () - let (localHandles, _) = partitionQualified loc (fromRange hs) + let (localHandles, _) = partitionQualified lself (fromRange hs) us <- getIds localHandles - Handle.filterHandleResults self =<< byIds us + Handle.filterHandleResults lself =<< byIds lself us case foundUsers of [] -> throwStd $ notFound "None of the specified ids or handles match any users" _ -> pure foundUsers @@ -963,8 +965,8 @@ listUsersByIdsOrHandles self q = do localUsers <- catMaybes <$> traverse (lift . API.lookupHandle) localHandles domain <- viewFederationDomain pure $ map (`Qualified` domain) localUsers - byIds :: [Qualified UserId] -> Handler [Public.UserProfile] - byIds uids = API.lookupProfiles self uids !>> fedError + byIds :: Local UserId -> [Qualified UserId] -> Handler [Public.UserProfile] + byIds lself uids = API.lookupProfiles lself uids !>> fedError newtype GetActivationCodeResp = GetActivationCodeResp (Public.ActivationKey, Public.ActivationCode) diff --git a/services/brig/src/Brig/API/User.hs b/services/brig/src/Brig/API/User.hs index 79f827f616..cf8e924701 100644 --- a/services/brig/src/Brig/API/User.hs +++ b/services/brig/src/Brig/API/User.hs @@ -91,7 +91,7 @@ where import qualified Brig.API.Error as Error import qualified Brig.API.Handler as API (Handler) import Brig.API.Types -import Brig.API.Util (fetchUserIdentity, validateHandle) +import Brig.API.Util import Brig.App import qualified Brig.Code as Code import Brig.Data.Activation (ActivationEvent (..)) @@ -127,13 +127,11 @@ import Brig.User.Handle.Blacklist import Brig.User.Phone import qualified Brig.User.Search.TeamSize as TeamSize import Control.Arrow ((&&&)) -import Control.Concurrent.Async (mapConcurrently, mapConcurrently_) import Control.Error import Control.Lens (view, (^.)) import Control.Monad.Catch import Data.ByteString.Conversion import qualified Data.Currency as Currency -import Data.Domain (Domain) import Data.Handle (Handle) import Data.Id as Id import Data.Json.Util @@ -142,7 +140,7 @@ import Data.List1 (List1) import qualified Data.Map.Strict as Map import qualified Data.Metrics as Metrics import Data.Misc (PlainTextPassword (..)) -import Data.Qualified (Qualified, indexQualified) +import Data.Qualified import Data.Time.Clock (addUTCTime, diffUTCTime) import Data.UUID.V4 (nextRandom) import qualified Galley.Types.Teams as Team @@ -151,6 +149,7 @@ import Imports import Network.Wai.Utilities import qualified System.Logger.Class as Log import System.Logger.Message +import UnliftIO.Async import Wire.API.Federation.Client (FederationError (..)) import Wire.API.Routes.Internal.Brig.Connection import Wire.API.Team.Member (legalHoldStatus) @@ -1123,8 +1122,12 @@ userGC u = case (userExpire u) of deleteUserNoVerify (userId u) return u -lookupProfile :: UserId -> Qualified UserId -> ExceptT FederationError AppIO (Maybe UserProfile) -lookupProfile self other = listToMaybe <$> lookupProfiles self [other] +lookupProfile :: Local UserId -> Qualified UserId -> ExceptT FederationError AppIO (Maybe UserProfile) +lookupProfile self other = + listToMaybe + <$> lookupProfilesFromDomain + self + (fmap pure other) -- | Obtain user profiles for a list of users as they can be seen by -- a given user 'self'. User 'self' can see the 'FullProfile' of any other user 'other', @@ -1133,22 +1136,27 @@ lookupProfile self other = listToMaybe <$> lookupProfiles self [other] -- If 'self' is an unknown 'UserId', return '[]'. lookupProfiles :: -- | User 'self' on whose behalf the profiles are requested. - UserId -> + Local UserId -> -- | The users ('others') for which to obtain the profiles. [Qualified UserId] -> ExceptT FederationError AppIO [UserProfile] -lookupProfiles self others = do - localDomain <- viewFederationDomain - let userMap = indexQualified others - -- FUTUREWORK(federation): parallelise federator requests here - fold <$> traverse (uncurry (getProfiles localDomain)) (Map.assocs userMap) - where - getProfiles localDomain domain uids - | localDomain == domain = lift (lookupLocalProfiles (Just self) uids) - | otherwise = lookupRemoteProfiles domain uids - -lookupRemoteProfiles :: Domain -> [UserId] -> ExceptT FederationError AppIO [UserProfile] -lookupRemoteProfiles = Federation.getUsersByIds +lookupProfiles self others = + fmap concat $ + traverseConcurrentlyWithErrors + (lookupProfilesFromDomain self) + (bucketQualified others) + +lookupProfilesFromDomain :: + Local UserId -> Qualified [UserId] -> ExceptT FederationError AppIO [UserProfile] +lookupProfilesFromDomain self = + foldQualified + self + (lift . lookupLocalProfiles (Just (tUnqualified self)) . tUnqualified) + lookupRemoteProfiles + +lookupRemoteProfiles :: Remote [UserId] -> ExceptT FederationError AppIO [UserProfile] +lookupRemoteProfiles (qUntagged -> Qualified uids domain) = + Federation.getUsersByIds domain uids -- FUTUREWORK: This function encodes a few business rules about exposing email -- ids, but it is also very complex. Maybe this can be made easy by extracting a diff --git a/services/brig/src/Brig/API/Util.hs b/services/brig/src/Brig/API/Util.hs index 358f5d665e..05d22451cf 100644 --- a/services/brig/src/Brig/API/Util.hs +++ b/services/brig/src/Brig/API/Util.hs @@ -22,6 +22,7 @@ module Brig.API.Util logInvitationCode, validateHandle, logEmail, + traverseConcurrentlyWithErrors, ) where @@ -33,7 +34,7 @@ import qualified Brig.Data.User as Data import Brig.Types import Brig.Types.Intra (accountUser) import Control.Monad.Catch (throwM) -import Control.Monad.Trans.Except (throwE) +import Control.Monad.Trans.Except import Data.Handle (Handle, parseHandle) import Data.Id import Data.Maybe @@ -42,6 +43,8 @@ import Data.Text.Ascii (AsciiText (toText)) import Imports import System.Logger (Msg) import qualified System.Logger as Log +import UnliftIO.Async +import UnliftIO.Exception (throwIO, try) import Util.Logging (sha256String) lookupProfilesMaybeFilterSameTeamOnly :: UserId -> [UserProfile] -> Handler [UserProfile] @@ -73,3 +76,13 @@ logEmail email = logInvitationCode :: InvitationCode -> (Msg -> Msg) logInvitationCode code = Log.field "invitation_code" (toText $ fromInvitationCode code) + +-- | Traverse concurrently and fail on first error. +traverseConcurrentlyWithErrors :: + (Traversable t, Exception e) => + (a -> ExceptT e AppIO b) -> + t a -> + ExceptT e AppIO (t b) +traverseConcurrentlyWithErrors f = + ExceptT . try . (traverse (either throwIO pure) =<<) + . pooledMapConcurrentlyN 8 (runExceptT . f) diff --git a/services/brig/src/Brig/Federation/Client.hs b/services/brig/src/Brig/Federation/Client.hs index 002d3921dc..a6a33b550f 100644 --- a/services/brig/src/Brig/Federation/Client.hs +++ b/services/brig/src/Brig/Federation/Client.hs @@ -46,8 +46,8 @@ type FederationAppIO = ExceptT FederationError AppIO -- FUTUREWORK: Maybe find a way to tranform 'clientRoutes' into a client which -- only uses 'FederationAppIO' monad, then boilerplate in this module can all be -- deleted. -getUserHandleInfo :: Qualified Handle -> FederationAppIO (Maybe UserProfile) -getUserHandleInfo (Qualified handle domain) = do +getUserHandleInfo :: Remote Handle -> FederationAppIO (Maybe UserProfile) +getUserHandleInfo (qUntagged -> Qualified handle domain) = do Log.info $ Log.msg $ T.pack "Brig-federation: handle lookup call on remote backend" executeFederated domain $ getUserByHandle clientRoutes handle diff --git a/services/brig/src/Brig/User/API/Handle.hs b/services/brig/src/Brig/User/API/Handle.hs index 0481d4c26d..4ef90c1bb3 100644 --- a/services/brig/src/Brig/User/API/Handle.hs +++ b/services/brig/src/Brig/User/API/Handle.hs @@ -26,14 +26,14 @@ where import Brig.API.Error (fedError) import Brig.API.Handler (Handler) import qualified Brig.API.User as API -import Brig.App (settings, viewFederationDomain) +import Brig.App import qualified Brig.Data.User as Data import qualified Brig.Federation.Client as Federation import Brig.Options (searchSameTeamOnly) import Control.Lens (view) import Data.Handle (Handle, fromHandle) import Data.Id (UserId) -import Data.Qualified (Qualified (..)) +import Data.Qualified import Imports import Network.Wai.Utilities ((!>>)) import qualified System.Logger.Class as Log @@ -42,19 +42,23 @@ import qualified Wire.API.User as Public import Wire.API.User.Search import qualified Wire.API.User.Search as Public --- FUTUREWORK: use 'runMaybeT' to simplify this. getHandleInfo :: UserId -> Qualified Handle -> Handler (Maybe Public.UserProfile) getHandleInfo self handle = do - domain <- viewFederationDomain - if qDomain handle == domain - then getLocalHandleInfo self (qUnqualified handle) - else getRemoteHandleInfo - where - getRemoteHandleInfo = do - Log.info $ Log.msg (Log.val "getHandleInfo - remote lookup") Log.~~ Log.field "domain" (show (qDomain handle)) - Federation.getUserHandleInfo handle !>> fedError + lself <- qualifyLocal self + foldQualified + lself + (getLocalHandleInfo lself . tUnqualified) + getRemoteHandleInfo + handle -getLocalHandleInfo :: UserId -> Handle -> Handler (Maybe Public.UserProfile) +getRemoteHandleInfo :: Remote Handle -> Handler (Maybe Public.UserProfile) +getRemoteHandleInfo handle = do + Log.info $ + Log.msg (Log.val "getHandleInfo - remote lookup") + . Log.field "domain" (show (tDomain handle)) + Federation.getUserHandleInfo handle !>> fedError + +getLocalHandleInfo :: Local UserId -> Handle -> Handler (Maybe Public.UserProfile) getLocalHandleInfo self handle = do Log.info $ Log.msg $ Log.val "getHandleInfo - local lookup" maybeOwnerId <- lift $ API.lookupHandle handle @@ -67,12 +71,12 @@ getLocalHandleInfo self handle = do return $ listToMaybe owner -- | Checks search permissions and filters accordingly -filterHandleResults :: UserId -> [Public.UserProfile] -> Handler [Public.UserProfile] +filterHandleResults :: Local UserId -> [Public.UserProfile] -> Handler [Public.UserProfile] filterHandleResults searchingUser us = do sameTeamSearchOnly <- fromMaybe False <$> view (settings . searchSameTeamOnly) if sameTeamSearchOnly then do - fromTeam <- lift $ Data.lookupUserTeam searchingUser + fromTeam <- lift $ Data.lookupUserTeam (tUnqualified searchingUser) return $ case fromTeam of Just team -> filter (\x -> Public.profileTeam x == Just team) us Nothing -> us diff --git a/services/brig/src/Brig/User/API/Search.hs b/services/brig/src/Brig/User/API/Search.hs index ecaccf457b..b0c3a8f6c3 100644 --- a/services/brig/src/Brig/User/API/Search.hs +++ b/services/brig/src/Brig/User/API/Search.hs @@ -188,13 +188,14 @@ searchLocally searcherId searchTerm maybeMaxResults = do exactHandleSearch :: TeamSearchInfo -> Handler (Maybe Contact) exactHandleSearch teamSearchInfo = do + lsearcherId <- qualifyLocal searcherId let searchedHandleMaybe = parseHandle searchTerm exactHandleResult <- case searchedHandleMaybe of Nothing -> pure Nothing Just searchedHandle -> contactFromProfile - <$$> HandleAPI.getLocalHandleInfo searcherId searchedHandle + <$$> HandleAPI.getLocalHandleInfo lsearcherId searchedHandle pure $ case teamSearchInfo of Search.TeamOnly t -> if Just t == (contactTeam =<< exactHandleResult) diff --git a/services/galley/src/Galley/API/Message.hs b/services/galley/src/Galley/API/Message.hs index 8429863851..1df97be4f3 100644 --- a/services/galley/src/Galley/API/Message.hs +++ b/services/galley/src/Galley/API/Message.hs @@ -183,7 +183,7 @@ getRemoteClients :: [RemoteMember] -> Galley (Map (Domain, UserId) (Set ClientId getRemoteClients remoteMembers = do fmap mconcat -- concatenating maps is correct here, because their sets of keys are disjoint . pooledMapConcurrentlyN 8 getRemoteClientsFromDomain - . indexRemote + . bucketRemote . map rmId $ remoteMembers where diff --git a/services/galley/src/Galley/API/Query.hs b/services/galley/src/Galley/API/Query.hs index 7212b12346..e05ee41249 100644 --- a/services/galley/src/Galley/API/Query.hs +++ b/services/galley/src/Galley/API/Query.hs @@ -181,7 +181,7 @@ getRemoteConversationsWithFailures zusr convs = do -- request conversations from remote backends fmap (bimap (localFailures <>) concat . partitionEithers) - . pooledForConcurrentlyN 8 (indexRemote locallyFound) + . pooledForConcurrentlyN 8 (bucketRemote locallyFound) $ \someConvs -> do let req = FederatedGalley.GetConversationsRequest zusr (tUnqualified someConvs) rpc = FederatedGalley.getConversations FederatedGalley.clientRoutes localDomain req diff --git a/services/galley/src/Galley/API/Update.hs b/services/galley/src/Galley/API/Update.hs index 4a4dce798e..618834640f 100644 --- a/services/galley/src/Galley/API/Update.hs +++ b/services/galley/src/Galley/API/Update.hs @@ -1035,7 +1035,7 @@ notifyConversationMetadataUpdate quid con (qUntagged -> qcnv) targets action = d let e = conversationActionToEvent now quid qcnv action -- notify remote participants - let rusersByDomain = indexRemote (toList (bmRemotes targets)) + let rusersByDomain = bucketRemote (toList (bmRemotes targets)) void . pooledForConcurrentlyN 8 rusersByDomain $ \(qUntagged -> Qualified uids domain) -> do let req = FederatedGalley.ConversationUpdate now quid (qUnqualified qcnv) uids action rpc = diff --git a/services/galley/src/Galley/API/Util.hs b/services/galley/src/Galley/API/Util.hs index fae4c5a54c..0f84472b0f 100644 --- a/services/galley/src/Galley/API/Util.hs +++ b/services/galley/src/Galley/API/Util.hs @@ -58,7 +58,7 @@ import Network.HTTP.Types import Network.Wai import Network.Wai.Predicate hiding (Error) import Network.Wai.Utilities -import UnliftIO (concurrently) +import UnliftIO.Async import qualified Wire.API.Conversation as Public import Wire.API.Conversation.Action (ConversationAction (..), conversationActionTag) import Wire.API.ErrorDescription @@ -610,7 +610,7 @@ qualifyLocal a = toLocalUnsafe <$> viewFederationDomain <*> pure a checkRemoteUsersExist :: (Functor f, Foldable f) => f (Remote UserId) -> Galley () checkRemoteUsersExist = -- FUTUREWORK: pooledForConcurrentlyN_ instead of sequential checks per domain - traverse_ checkRemotesFor . indexRemote + traverse_ checkRemotesFor . bucketRemote checkRemotesFor :: Remote [UserId] -> Galley () checkRemotesFor (qUntagged -> Qualified uids domain) = do @@ -636,10 +636,26 @@ runFederated remoteDomain rpc = do runExceptT (executeFederated remoteDomain rpc) >>= either (throwM . federationErrorToWai) pure +runFederatedConcurrently :: + (Foldable f, Functor f) => + f (Remote a) -> + (Remote [a] -> FederatedGalleyRPC c b) -> + Galley [Remote b] +runFederatedConcurrently xs rpc = + pooledForConcurrentlyN 8 (bucketRemote xs) $ \r -> + qualifyAs r <$> runFederated (tDomain r) (rpc r) + +runFederatedConcurrently_ :: + (Foldable f, Functor f) => + f (Remote a) -> + (Remote [a] -> FederatedGalleyRPC c ()) -> + Galley () +runFederatedConcurrently_ xs = void . runFederatedConcurrently xs + -- | Convert an internal conversation representation 'Data.Conversation' to -- 'NewRemoteConversation' to be sent over the wire to a remote backend that will -- reconstruct this into multiple public-facing --- 'Wire.API.Conversation.Convevrsation' values, one per user from that remote +-- 'Wire.API.Conversation.Conversation' values, one per user from that remote -- backend. -- -- FUTUREWORK: Include the team ID as well once it becomes qualified. @@ -741,23 +757,10 @@ registerRemoteConversationMemberships :: Data.Conversation -> Galley () registerRemoteConversationMemberships now localDomain c = do - let rc = toNewRemoteConversation now localDomain c - -- FUTUREWORK: parallelise federated requests - traverse_ (registerRemoteConversations rc) - . Map.keys - . indexQualified - . nubOrd - . map (qUntagged . rmId) - . Data.convRemoteMembers - $ c - where - registerRemoteConversations :: - NewRemoteConversation ConvId -> - Domain -> - Galley () - registerRemoteConversations rc domain = do - let rpc = FederatedGalley.onConversationCreated FederatedGalley.clientRoutes localDomain rc - runFederated domain rpc + let allRemoteMembers = nubOrd (map rmId (Data.convRemoteMembers c)) + rc = toNewRemoteConversation now localDomain c + runFederatedConcurrently_ allRemoteMembers $ \_ -> + FederatedGalley.onConversationCreated FederatedGalley.clientRoutes localDomain rc -------------------------------------------------------------------------------- -- Legalhold diff --git a/services/galley/src/Galley/Data.hs b/services/galley/src/Galley/Data.hs index 0e3267a7f5..d8e95c5872 100644 --- a/services/galley/src/Galley/Data.hs +++ b/services/galley/src/Galley/Data.hs @@ -617,7 +617,7 @@ remoteConversationStatus :: remoteConversationStatus uid = fmap mconcat . pooledMapConcurrentlyN 8 (remoteConversationStatusOnDomain uid) - . indexRemote + . bucketRemote remoteConversationStatusOnDomain :: MonadClient m => UserId -> Remote [ConvId] -> m (Map (Remote ConvId) MemberStatus) remoteConversationStatusOnDomain uid rconvs =