Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/6-federation/parallel-rpcs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Make federated requests to multiple backends in parallel.
18 changes: 14 additions & 4 deletions libs/types-common/src/Data/Qualified.hs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
module Data.Qualified
( -- * Qualified
Qualified (..),
qToPair,
QualifiedWithTag,
tUnqualified,
tUnqualifiedL,
Expand All @@ -35,15 +36,17 @@ module Data.Qualified
qualifyAs,
foldQualified,
partitionQualified,
partitionQualifiedAndTag,
indexQualified,
bucketQualified,
indexRemote,
bucketRemote,
deprecatedSchema,
)
where

import Control.Lens (Lens, lens, (?~))
import Data.Aeson (FromJSON (..), ToJSON (..))
import Data.Bifunctor (first)
import Data.Domain (Domain)
import Data.Handle (Handle (..))
import Data.Id
Expand All @@ -62,6 +65,9 @@ data Qualified a = Qualified
}
deriving stock (Eq, Ord, Show, Generic, Functor, Foldable, Traversable)

qToPair :: Qualified a -> (Domain, a)
qToPair (Qualified x dom) = (dom, x)

data QTag = QLocal | QRemote
deriving (Eq, Show)

Expand Down Expand Up @@ -125,6 +131,11 @@ partitionQualified loc =
foldMap $
foldQualified loc (\l -> ([tUnqualified l], mempty)) (\r -> (mempty, [r]))

partitionQualifiedAndTag :: Foldable f => Local x -> f (Qualified a) -> ([Local a], [Remote a])
partitionQualifiedAndTag loc =
first (map (qualifyAs loc))
. partitionQualified loc

-- | Index a list of qualified values by domain.
indexQualified :: Foldable f => f (Qualified a) -> Map Domain [a]
indexQualified = foldr add mempty
Expand All @@ -136,9 +147,8 @@ indexQualified = foldr add mempty
bucketQualified :: Foldable f => f (Qualified a) -> [Qualified [a]]
bucketQualified = map (\(d, a) -> Qualified a d) . Map.assocs . indexQualified

-- FUTUREWORK: Rename this to 'bucketRemote'
indexRemote :: (Functor f, Foldable f) => f (Remote a) -> [Remote [a]]
indexRemote =
bucketRemote :: (Functor f, Foldable f) => f (Remote a) -> [Remote [a]]
bucketRemote =
map (uncurry toRemoteUnsafe)
. Map.assocs
. indexQualified
Expand Down
8 changes: 5 additions & 3 deletions libs/wire-api-federation/src/Wire/API/Federation/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,16 @@ data FederationError
| FederationNotImplemented
| FederationNotConfigured
| FederationCallFailure FederationClientFailure
deriving (Show, Eq)
deriving (Show, Eq, Typeable)

instance Exception FederationError

data FederationClientFailure = FederationClientFailure
{ fedFailDomain :: Domain,
fedFailPath :: ByteString,
fedFailError :: FederationClientError
}
deriving (Show, Eq)
deriving (Show, Eq, Typeable)

data FederationClientError
= FederationClientInvalidMethod HTTP.Method
Expand All @@ -139,7 +141,7 @@ data FederationClientError
| FederationClientOutwardError Proto.OutwardError
| FederationClientInwardError Proto.InwardError
| FederationClientServantError Servant.ClientError
deriving (Show, Eq)
deriving (Show, Eq, Typeable)

callRemote :: MonadIO m => GrpcClient -> Proto.ValidatedFederatedRequest -> m (GRpcReply Proto.OutwardResponse)
callRemote fedClient call = liftIO $ gRpcCall @'MsgProtoBuf @Proto.Outward @"Outward" @"call" fedClient (Proto.validatedFederatedRequestToFederatedRequest call)
Expand Down
2 changes: 1 addition & 1 deletion libs/wire-api/src/Wire/API/Team/LegalHold.hs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ data LegalholdProtectee
| -- | add UserId here if you want to protect bots as well (or just remove and use
-- 'ProtectedUser', but then you'll loose the user type information).
UnprotectedBot
| -- | FUTUREWORK: protection against legalhold when looking up prekeys accross federated
| -- | FUTUREWORK: protection against legalhold when looking up prekeys across federated
-- instances.
LegalholdPlusFederationNotImplemented
deriving (Show, Eq, Ord, Generic)
Expand Down
8 changes: 8 additions & 0 deletions libs/wire-api/src/Wire/API/User/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ module Wire.API.User.Client
QualifiedUserClientMap (..),
QualifiedUserClientPrekeyMap (..),
mkQualifiedUserClientPrekeyMap,
qualifiedUserClientPrekeyMapFromList,
UserClientsFull (..),
userClientsFullToUserClients,
UserClients (..),
Expand Down Expand Up @@ -84,6 +85,7 @@ import Data.Id
import Data.Json.Util
import qualified Data.Map.Strict as Map
import Data.Misc (Latitude (..), Location, Longitude (..), PlainTextPassword (..), latitude, location, longitude, modelLocation)
import Data.Qualified
import Data.Schema
import qualified Data.Semigroup as Semigroup
import qualified Data.Set as Set
Expand Down Expand Up @@ -308,6 +310,12 @@ instance ToSchema QualifiedUserClientPrekeyMap where
mkQualifiedUserClientPrekeyMap :: Map Domain UserClientPrekeyMap -> QualifiedUserClientPrekeyMap
mkQualifiedUserClientPrekeyMap = coerce

qualifiedUserClientPrekeyMapFromList ::
[Qualified UserClientPrekeyMap] ->
QualifiedUserClientPrekeyMap
qualifiedUserClientPrekeyMapFromList =
mkQualifiedUserClientPrekeyMap . Map.fromList . map qToPair

--------------------------------------------------------------------------------
-- UserClients

Expand Down
51 changes: 33 additions & 18 deletions services/brig/src/Brig/API/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ module Brig.API.Client
where

import Brig.API.Types
import Brig.API.Util
import Brig.App
import qualified Brig.Data.Client as Data
import qualified Brig.Data.User as Data
Expand All @@ -71,17 +72,16 @@ import qualified Data.Map.Strict as Map
import Data.Misc (PlainTextPassword (..))
import Data.Qualified
import qualified Data.Set as Set
import Galley.Types (UserClients (..))
import Imports
import Network.Wai.Utilities
import System.Logger.Class (field, msg, val, (~~))
import qualified System.Logger.Class as Log
import UnliftIO.Async (Concurrently (Concurrently, runConcurrently))
import Wire.API.Federation.API.Brig (GetUserClients (GetUserClients))
import Wire.API.Federation.Client (FederationError (..))
import qualified Wire.API.Message as Message
import Wire.API.Team.LegalHold (LegalholdProtectee (..))
import Wire.API.User.Client (ClientCapabilityList (..), QualifiedUserClientPrekeyMap (..), QualifiedUserClients (..), UserClientPrekeyMap, mkQualifiedUserClientPrekeyMap, mkUserClientPrekeyMap)
import qualified Wire.API.User.Client as Client
import Wire.API.User.Client
import Wire.API.UserMap (QualifiedUserMap (QualifiedUserMap, qualifiedUserMap), UserMap (userMap))

lookupLocalClient :: UserId -> ClientId -> AppIO (Maybe Client)
Expand Down Expand Up @@ -126,14 +126,14 @@ addClient u con ip new = do
acc <- lift (Data.lookupAccount u) >>= maybe (throwE (ClientUserNotFound u)) return
loc <- maybe (return Nothing) locationOf ip
maxPermClients <- fromMaybe Opt.defUserMaxPermClients <$> Opt.setUserMaxPermClients <$> view settings
let caps :: Maybe (Set Client.ClientCapability)
let caps :: Maybe (Set ClientCapability)
caps = updlhdev $ newClientCapabilities new
where
updlhdev =
if newClientType new == LegalHoldClientType
then Just . maybe (Set.singleton lhcaps) (Set.insert lhcaps)
else id
lhcaps = Client.ClientSupportsLegalholdImplicitConsent
lhcaps = ClientSupportsLegalholdImplicitConsent
(clt, old, count) <- Data.addClient u clientId' new maxPermClients loc caps !>> ClientDataError
let usr = accountUser acc
lift $ do
Expand Down Expand Up @@ -186,7 +186,7 @@ claimPrekey protectee u d c = do

claimLocalPrekey :: LegalholdProtectee -> UserId -> ClientId -> ExceptT ClientError AppIO (Maybe ClientPrekey)
claimLocalPrekey protectee user client = do
guardLegalhold protectee (Client.mkUserClients [(user, [client])])
guardLegalhold protectee (mkUserClients [(user, [client])])
lift $ do
prekey <- Data.claimPrekey user client
when (isNothing prekey) (noPrekeys user client)
Expand All @@ -205,7 +205,7 @@ claimPrekeyBundle protectee domain uid = do
claimLocalPrekeyBundle :: LegalholdProtectee -> UserId -> ExceptT ClientError AppIO PrekeyBundle
claimLocalPrekeyBundle protectee u = do
clients <- map clientId <$> Data.lookupClients u
guardLegalhold protectee (Client.mkUserClients [(u, clients)])
guardLegalhold protectee (mkUserClients [(u, clients)])
PrekeyBundle u . catMaybes <$> lift (mapM (Data.claimPrekey u) clients)

claimRemotePrekeyBundle :: Qualified UserId -> ExceptT ClientError AppIO PrekeyBundle
Expand All @@ -214,18 +214,33 @@ claimRemotePrekeyBundle quser = do

claimMultiPrekeyBundles :: LegalholdProtectee -> QualifiedUserClients -> ExceptT ClientError AppIO QualifiedUserClientPrekeyMap
claimMultiPrekeyBundles protectee quc = do
localDomain <- viewFederationDomain
fmap (mkQualifiedUserClientPrekeyMap . Map.fromList)
-- FUTUREWORK(federation): parallelise federator requests here
. traverse (\(domain, uc) -> (domain,) <$> claim localDomain domain (UserClients uc))
. Map.assocs
. qualifiedUserClients
$ quc
loc <- qualifyLocal ()
let (locals, remotes) =
partitionQualifiedAndTag
loc
( map
(fmap UserClients . uncurry (flip Qualified))
(Map.assocs (qualifiedUserClients quc))
)
localPrekeys <- traverse claimLocal locals
remotePrekeys <-
traverseConcurrentlyWithErrors
claimRemote
remotes
!>> ClientFederationError
pure . qualifiedUserClientPrekeyMapFromList $ localPrekeys <> remotePrekeys
where
claim :: Domain -> Domain -> UserClients -> ExceptT ClientError AppIO UserClientPrekeyMap
claim localDomain domain uc
| domain == localDomain = claimLocalMultiPrekeyBundles protectee uc
| otherwise = Federation.claimMultiPrekeyBundle domain uc !>> ClientFederationError
claimRemote ::
Remote UserClients ->
ExceptT FederationError AppIO (Qualified UserClientPrekeyMap)
claimRemote ruc =
qUntagged . qualifyAs ruc
<$> Federation.claimMultiPrekeyBundle (tDomain ruc) (tUnqualified ruc)

claimLocal :: Local UserClients -> ExceptT ClientError AppIO (Qualified UserClientPrekeyMap)
claimLocal luc =
qUntagged . qualifyAs luc
<$> claimLocalMultiPrekeyBundles protectee (tUnqualified luc)

claimLocalMultiPrekeyBundles :: LegalholdProtectee -> UserClients -> ExceptT ClientError AppIO UserClientPrekeyMap
claimLocalMultiPrekeyBundles protectee userClients = do
Expand Down
16 changes: 9 additions & 7 deletions services/brig/src/Brig/API/Public.hs
Original file line number Diff line number Diff line change
Expand Up @@ -918,7 +918,9 @@ getUserUnqualifiedH self uid = do
getUser self (Qualified uid domain)

getUser :: UserId -> Qualified UserId -> Handler (Maybe Public.UserProfile)
getUser self qualifiedUserId = API.lookupProfile self qualifiedUserId !>> fedError
getUser self qualifiedUserId = do
lself <- qualifyLocal self
API.lookupProfile lself qualifiedUserId !>> fedError

getUserDisplayNameH :: JSON ::: UserId -> Handler Response
getUserDisplayNameH (_ ::: self) = do
Expand Down Expand Up @@ -946,14 +948,14 @@ listUsersByUnqualifiedIdsOrHandles self mUids mHandles = do

listUsersByIdsOrHandles :: UserId -> Public.ListUsersQuery -> Handler [Public.UserProfile]
listUsersByIdsOrHandles self q = do
lself <- qualifyLocal self
foundUsers <- case q of
Public.ListUsersByIds us ->
byIds us
byIds lself us
Public.ListUsersByHandles hs -> do
loc <- qualifyLocal ()
let (localHandles, _) = partitionQualified loc (fromRange hs)
let (localHandles, _) = partitionQualified lself (fromRange hs)
us <- getIds localHandles
Handle.filterHandleResults self =<< byIds us
Handle.filterHandleResults lself =<< byIds lself us
case foundUsers of
[] -> throwStd $ notFound "None of the specified ids or handles match any users"
_ -> pure foundUsers
Expand All @@ -963,8 +965,8 @@ listUsersByIdsOrHandles self q = do
localUsers <- catMaybes <$> traverse (lift . API.lookupHandle) localHandles
domain <- viewFederationDomain
pure $ map (`Qualified` domain) localUsers
byIds :: [Qualified UserId] -> Handler [Public.UserProfile]
byIds uids = API.lookupProfiles self uids !>> fedError
byIds :: Local UserId -> [Qualified UserId] -> Handler [Public.UserProfile]
byIds lself uids = API.lookupProfiles lself uids !>> fedError

newtype GetActivationCodeResp
= GetActivationCodeResp (Public.ActivationKey, Public.ActivationCode)
Expand Down
46 changes: 27 additions & 19 deletions services/brig/src/Brig/API/User.hs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ where
import qualified Brig.API.Error as Error
import qualified Brig.API.Handler as API (Handler)
import Brig.API.Types
import Brig.API.Util (fetchUserIdentity, validateHandle)
import Brig.API.Util
import Brig.App
import qualified Brig.Code as Code
import Brig.Data.Activation (ActivationEvent (..))
Expand Down Expand Up @@ -127,13 +127,11 @@ import Brig.User.Handle.Blacklist
import Brig.User.Phone
import qualified Brig.User.Search.TeamSize as TeamSize
import Control.Arrow ((&&&))
import Control.Concurrent.Async (mapConcurrently, mapConcurrently_)
import Control.Error
import Control.Lens (view, (^.))
import Control.Monad.Catch
import Data.ByteString.Conversion
import qualified Data.Currency as Currency
import Data.Domain (Domain)
import Data.Handle (Handle)
import Data.Id as Id
import Data.Json.Util
Expand All @@ -142,7 +140,7 @@ import Data.List1 (List1)
import qualified Data.Map.Strict as Map
import qualified Data.Metrics as Metrics
import Data.Misc (PlainTextPassword (..))
import Data.Qualified (Qualified, indexQualified)
import Data.Qualified
import Data.Time.Clock (addUTCTime, diffUTCTime)
import Data.UUID.V4 (nextRandom)
import qualified Galley.Types.Teams as Team
Expand All @@ -151,6 +149,7 @@ import Imports
import Network.Wai.Utilities
import qualified System.Logger.Class as Log
import System.Logger.Message
import UnliftIO.Async
import Wire.API.Federation.Client (FederationError (..))
import Wire.API.Routes.Internal.Brig.Connection
import Wire.API.Team.Member (legalHoldStatus)
Expand Down Expand Up @@ -1123,8 +1122,12 @@ userGC u = case (userExpire u) of
deleteUserNoVerify (userId u)
return u

lookupProfile :: UserId -> Qualified UserId -> ExceptT FederationError AppIO (Maybe UserProfile)
lookupProfile self other = listToMaybe <$> lookupProfiles self [other]
lookupProfile :: Local UserId -> Qualified UserId -> ExceptT FederationError AppIO (Maybe UserProfile)
lookupProfile self other =
listToMaybe
<$> lookupProfilesFromDomain
self
(fmap pure other)

-- | Obtain user profiles for a list of users as they can be seen by
-- a given user 'self'. User 'self' can see the 'FullProfile' of any other user 'other',
Expand All @@ -1133,22 +1136,27 @@ lookupProfile self other = listToMaybe <$> lookupProfiles self [other]
-- If 'self' is an unknown 'UserId', return '[]'.
lookupProfiles ::
-- | User 'self' on whose behalf the profiles are requested.
UserId ->
Local UserId ->
-- | The users ('others') for which to obtain the profiles.
[Qualified UserId] ->
ExceptT FederationError AppIO [UserProfile]
lookupProfiles self others = do
localDomain <- viewFederationDomain
let userMap = indexQualified others
-- FUTUREWORK(federation): parallelise federator requests here
fold <$> traverse (uncurry (getProfiles localDomain)) (Map.assocs userMap)
where
getProfiles localDomain domain uids
| localDomain == domain = lift (lookupLocalProfiles (Just self) uids)
| otherwise = lookupRemoteProfiles domain uids

lookupRemoteProfiles :: Domain -> [UserId] -> ExceptT FederationError AppIO [UserProfile]
lookupRemoteProfiles = Federation.getUsersByIds
lookupProfiles self others =
fmap concat $
traverseConcurrentlyWithErrors
(lookupProfilesFromDomain self)
(bucketQualified others)

lookupProfilesFromDomain ::
Local UserId -> Qualified [UserId] -> ExceptT FederationError AppIO [UserProfile]
lookupProfilesFromDomain self =
foldQualified
self
(lift . lookupLocalProfiles (Just (tUnqualified self)) . tUnqualified)
lookupRemoteProfiles

lookupRemoteProfiles :: Remote [UserId] -> ExceptT FederationError AppIO [UserProfile]
lookupRemoteProfiles (qUntagged -> Qualified uids domain) =
Federation.getUsersByIds domain uids

-- FUTUREWORK: This function encodes a few business rules about exposing email
-- ids, but it is also very complex. Maybe this can be made easy by extracting a
Expand Down
Loading