diff --git a/changelog.d/5-internal/polysemy-access-effects b/changelog.d/5-internal/polysemy-access-effects new file mode 100644 index 0000000000..ac3addb66f --- /dev/null +++ b/changelog.d/5-internal/polysemy-access-effects @@ -0,0 +1 @@ +Turn placeholder access effects into actual Polysemy effects. diff --git a/libs/bilge/src/Bilge/IO.hs b/libs/bilge/src/Bilge/IO.hs index b5d12a05cb..9166df1bd7 100644 --- a/libs/bilge/src/Bilge/IO.hs +++ b/libs/bilge/src/Bilge/IO.hs @@ -91,7 +91,7 @@ data Debug Full deriving (Eq, Ord, Show, Enum) -type Http a = HttpT IO a +type Http = HttpT IO newtype HttpT m a = HttpT { unwrap :: ReaderT Manager m a diff --git a/libs/bilge/src/Bilge/RPC.hs b/libs/bilge/src/Bilge/RPC.hs index 04cb232ec3..8e91f0880f 100644 --- a/libs/bilge/src/Bilge/RPC.hs +++ b/libs/bilge/src/Bilge/RPC.hs @@ -45,6 +45,9 @@ import System.Logger.Class class HasRequestId m where getRequestId :: m RequestId +instance Monad m => HasRequestId (ReaderT RequestId m) where + getRequestId = ask + data RPCException = RPCException { rpceRemote :: !LText, rpceRequest :: !Request, diff --git a/services/galley/galley.cabal b/services/galley/galley.cabal index 37905a23de..9110d9df69 100644 --- a/services/galley/galley.cabal +++ b/services/galley/galley.cabal @@ -4,7 +4,7 @@ cabal-version: 1.12 -- -- see: https://github.com/sol/hpack -- --- hash: 1b185cc3a9afe5d7a6c21c93d6c031963a5eb924885b6314ffc92ce96c6b545d +-- hash: aaed6006a10580d11a903fa32d0d6d09234867ab992f293833616eb68c7071bb name: galley version: 0.83.0 @@ -71,23 +71,34 @@ library Galley.Data.TeamNotifications Galley.Data.Types Galley.Effects + Galley.Effects.BotAccess + Galley.Effects.BrigAccess Galley.Effects.ClientStore Galley.Effects.CodeStore Galley.Effects.ConversationStore + Galley.Effects.ExternalAccess + Galley.Effects.FederatorAccess Galley.Effects.FireAndForget + Galley.Effects.GundeckAccess Galley.Effects.ListItems Galley.Effects.MemberStore Galley.Effects.Paging Galley.Effects.RemoteConversationListStore Galley.Effects.ServiceStore + Galley.Effects.SparAccess Galley.Effects.TeamMemberStore Galley.Effects.TeamStore Galley.Env Galley.External Galley.External.LegalHoldService + Galley.External.LegalHoldService.Types Galley.Intra.Client + Galley.Intra.Effects + Galley.Intra.Federator + Galley.Intra.Federator.Types Galley.Intra.Journal Galley.Intra.Push + Galley.Intra.Push.Internal Galley.Intra.Spar Galley.Intra.Team Galley.Intra.User @@ -163,6 +174,7 @@ library , saml2-web-sso >=0.18 , servant , servant-client + , servant-client-core , servant-server , servant-swagger , servant-swagger-ui diff --git a/services/galley/package.yaml b/services/galley/package.yaml index ebc59277a5..a46530baf0 100644 --- a/services/galley/package.yaml +++ b/services/galley/package.yaml @@ -76,6 +76,7 @@ library: - retry >=0.5 - safe-exceptions >=0.1 - servant + - servant-client-core - servant-server - servant-swagger - servant-swagger-ui diff --git a/services/galley/src/Galley/API/Clients.hs b/services/galley/src/Galley/API/Clients.hs index 03fc8f75e1..8d209b5765 100644 --- a/services/galley/src/Galley/API/Clients.hs +++ b/services/galley/src/Galley/API/Clients.hs @@ -26,8 +26,8 @@ import Control.Lens (view) import Data.Id import Galley.App import Galley.Effects +import qualified Galley.Effects.BrigAccess as E import qualified Galley.Effects.ClientStore as E -import qualified Galley.Intra.Client as Intra import Galley.Options import Galley.Types.Clients (clientIds, fromUserClients) import Imports @@ -49,9 +49,10 @@ getClients :: getClients usr = do isInternal <- view $ options . optSettings . setIntraListing clts <- - if isInternal - then fromUserClients <$> Intra.lookupClients [usr] - else liftSem $ E.getClients [usr] + liftSem $ + if isInternal + then fromUserClients <$> E.lookupClients [usr] + else E.getClients [usr] return $ clientIds usr clts addClientH :: diff --git a/services/galley/src/Galley/API/Create.hs b/services/galley/src/Galley/API/Create.hs index 3bc6e8f2dd..39f75a9733 100644 --- a/services/galley/src/Galley/API/Create.hs +++ b/services/galley/src/Galley/API/Create.hs @@ -44,6 +44,7 @@ import qualified Galley.Data.Conversation as Data import Galley.Data.Conversation.Types import Galley.Effects import qualified Galley.Effects.ConversationStore as E +import qualified Galley.Effects.GundeckAccess as E import qualified Galley.Effects.MemberStore as E import qualified Galley.Effects.TeamStore as E import Galley.Intra.Push @@ -390,7 +391,7 @@ createLegacyConnectConversation lusr conn lrecipient j = do e = Event ConvConnect (qUntagged lcid) (qUntagged lusr) now (EdConnect j) notifyCreatedConversation Nothing (tUnqualified lusr) conn c for_ (newPushLocal ListComplete (tUnqualified lusr) (ConvEvent e) (recipient <$> Data.convLocalMembers c)) $ \p -> - push1 $ + liftSem . E.push1 $ p & pushRoute .~ RouteDirect & pushConn .~ conn @@ -431,7 +432,7 @@ createLegacyConnectConversation lusr conn lrecipient j = do t <- liftIO getCurrentTime let e = Event ConvConnect qconv (qUntagged lusr) t (EdConnect j) for_ (newPushLocal ListComplete (tUnqualified lusr) (ConvEvent e) (recipient <$> Data.convLocalMembers conv)) $ \p -> - push1 $ + liftSem . E.push1 $ p & pushRoute .~ RouteDirect & pushConn .~ conn @@ -469,7 +470,7 @@ notifyCreatedConversation dtime usr conn c = do -- of being added to a conversation registerRemoteConversationMemberships now localDomain c -- Notify local users - pushSome =<< mapM (toPush localDomain now) (Data.convLocalMembers c) + liftSem . E.push =<< mapM (toPush localDomain now) (Data.convLocalMembers c) where route | Data.convType c == RegularConv = RouteAny diff --git a/services/galley/src/Galley/API/Federation.hs b/services/galley/src/Galley/API/Federation.hs index c0d81ce437..fcadd8f171 100644 --- a/services/galley/src/Galley/API/Federation.hs +++ b/services/galley/src/Galley/API/Federation.hs @@ -41,9 +41,9 @@ import Galley.API.Util import Galley.App import qualified Galley.Data.Conversation as Data import Galley.Effects +import qualified Galley.Effects.BrigAccess as E import qualified Galley.Effects.ConversationStore as E import qualified Galley.Effects.MemberStore as E -import Galley.Intra.User (getConnections) import Galley.Types.Conversations.Members (LocalMember (..), defMemberStatus) import Galley.Types.UserList import Imports @@ -216,7 +216,7 @@ addLocalUsersToRemoteConv :: [UserId] -> Galley r (Set UserId) addLocalUsersToRemoteConv remoteConvId qAdder localUsers = do - connStatus <- getConnections localUsers (Just [qAdder]) (Just Accepted) + connStatus <- liftSem $ E.getConnections localUsers (Just [qAdder]) (Just Accepted) let localUserIdsSet = Set.fromList localUsers connected = Set.fromList $ fmap csv2From connStatus unconnected = Set.difference localUserIdsSet connected diff --git a/services/galley/src/Galley/API/Internal.hs b/services/galley/src/Galley/API/Internal.hs index 2c8ad18c85..efec177340 100644 --- a/services/galley/src/Galley/API/Internal.hs +++ b/services/galley/src/Galley/API/Internal.hs @@ -57,6 +57,7 @@ import qualified Galley.Data.Conversation as Data import Galley.Effects import Galley.Effects.ClientStore import Galley.Effects.ConversationStore +import Galley.Effects.GundeckAccess import Galley.Effects.MemberStore import Galley.Effects.Paging import Galley.Effects.TeamStore @@ -556,7 +557,7 @@ rmUser user conn = do | otherwise -> return Nothing for_ (maybeList1 (catMaybes pp)) - Intra.push + (liftSem . push) leaveRemoteConversations :: Local UserId -> Range 1 FedGalley.UserDeletedNotificationMaxConvs [Remote ConvId] -> Galley r () leaveRemoteConversations lusr cids = do diff --git a/services/galley/src/Galley/API/LegalHold.hs b/services/galley/src/Galley/API/LegalHold.hs index 92ac4bd512..2f6d2a63c7 100644 --- a/services/galley/src/Galley/API/LegalHold.hs +++ b/services/galley/src/Galley/API/LegalHold.hs @@ -58,12 +58,11 @@ import Galley.Data.LegalHold (isTeamLegalholdWhitelisted) import qualified Galley.Data.LegalHold as LegalHoldData import qualified Galley.Data.TeamFeatures as TeamFeatures import Galley.Effects +import Galley.Effects.BrigAccess import Galley.Effects.Paging import Galley.Effects.TeamMemberStore import Galley.Effects.TeamStore import qualified Galley.External.LegalHoldService as LHService -import qualified Galley.Intra.Client as Client -import Galley.Intra.User (getConnectionsUnqualified, putConnectionInternal) import qualified Galley.Options as Opts import Galley.Types (LocalMember, lmConvRoleName, lmId) import Galley.Types.Teams as Team @@ -255,7 +254,7 @@ removeSettings' tid = removeLHForUser :: TeamMember -> Galley r () removeLHForUser member = do let uid = member ^. Team.userId - Client.removeLegalHoldClientFromUser uid + liftSem $ removeLegalHoldClientFromUser uid LHService.removeLegalHold tid uid changeLegalholdStatus tid uid (member ^. legalHoldStatus) UserLegalHoldDisabled -- (support for withdrawing consent is not planned yet.) @@ -425,7 +424,7 @@ requestDevice zusr tid uid = do -- We don't distinguish the last key here; brig will do so when the device is added LegalHoldData.insertPendingPrekeys uid (unpackLastPrekey lastPrekey' : prekeys) changeLegalholdStatus tid uid userLHStatus UserLegalHoldPending - Client.notifyClientsAboutLegalHoldRequest zusr uid lastPrekey' + liftSem $ notifyClientsAboutLegalHoldRequest zusr uid lastPrekey' requestDeviceFromService :: Galley r (LastPrekey, [Prekey]) requestDeviceFromService = do @@ -500,13 +499,13 @@ approveDevice zusr tid uid connId (Public.ApproveLegalHoldForUserRequest mPasswo Log.info $ Log.msg @Text "No prekeys found" throwM noLegalHoldDeviceAllocated Just keys -> pure keys - clientId <- Client.addLegalHoldClientToUser uid connId prekeys lastPrekey' + clientId <- liftSem $ addLegalHoldClientToUser uid connId prekeys lastPrekey' -- Note: teamId could be passed in the getLegalHoldAuthToken request instead of lookup up again - -- Note: both 'Client.getLegalHoldToken' and 'ensureReAuthorized' check the password - -- Note: both 'Client.getLegalHoldToken' and this function in 'assertOnTeam' above + -- Note: both 'getLegalHoldToken' and 'ensureReAuthorized' check the password + -- Note: both 'getLegalHoldToken' and this function in 'assertOnTeam' above -- checks that the user is part of a binding team -- FUTUREWORK: reduce double checks - legalHoldAuthToken <- Client.getLegalHoldAuthToken uid mPassword + legalHoldAuthToken <- liftSem $ getLegalHoldAuthToken uid mPassword LHService.confirmLegalHold clientId tid uid legalHoldAuthToken -- TODO: send event at this point (see also: -- https://github.com/wireapp/wire-server/pull/802#pullrequestreview-262280386) @@ -585,7 +584,7 @@ disableForUser zusr tid uid (Public.DisableLegalHoldForUserRequest mPassword) = disableLH :: UserLegalHoldStatus -> Galley r () disableLH userLHStatus = do ensureReAuthorised zusr mPassword - Client.removeLegalHoldClientFromUser uid + liftSem $ removeLegalHoldClientFromUser uid LHService.removeLegalHold tid uid -- TODO: send event at this point (see also: related TODO in this module in -- 'approveDevice' and @@ -642,7 +641,7 @@ changeLegalholdStatus tid uid old new = do UserLegalHoldNoConsent -> noop where update = LegalHoldData.setUserLegalHoldStatus tid uid new - removeblocks = void $ putConnectionInternal (RemoveLHBlocksInvolving uid) + removeblocks = void . liftSem $ putConnectionInternal (RemoveLHBlocksInvolving uid) addblocks = do blockNonConsentingConnections uid handleGroupConvPolicyConflicts uid new @@ -656,7 +655,7 @@ blockNonConsentingConnections :: UserId -> Galley r () blockNonConsentingConnections uid = do - conns <- getConnectionsUnqualified [uid] Nothing Nothing + conns <- liftSem $ getConnectionsUnqualified [uid] Nothing Nothing errmsgs <- do conflicts <- mconcat <$> findConflicts conns blockConflicts uid conflicts @@ -677,7 +676,7 @@ blockNonConsentingConnections uid = do blockConflicts :: UserId -> [UserId] -> Galley r [String] blockConflicts _ [] = pure [] blockConflicts userLegalhold othersToBlock@(_ : _) = do - status <- putConnectionInternal (BlockForMissingLHConsent userLegalhold othersToBlock) + status <- liftSem $ putConnectionInternal (BlockForMissingLHConsent userLegalhold othersToBlock) pure $ ["blocking users failed: " <> show (status, othersToBlock) | status /= status200] setTeamLegalholdWhitelisted :: TeamId -> Galley r () diff --git a/services/galley/src/Galley/API/LegalHold/Conflicts.hs b/services/galley/src/Galley/API/LegalHold/Conflicts.hs index e780222cbe..11995a36fa 100644 --- a/services/galley/src/Galley/API/LegalHold/Conflicts.hs +++ b/services/galley/src/Galley/API/LegalHold/Conflicts.hs @@ -30,9 +30,8 @@ import qualified Data.Set as Set import Galley.API.Util import Galley.App import Galley.Effects +import Galley.Effects.BrigAccess import Galley.Effects.TeamStore -import qualified Galley.Intra.Client as Intra -import Galley.Intra.User (getUser) import Galley.Options import Galley.Types.Teams hiding (self) import Imports @@ -90,7 +89,7 @@ guardLegalholdPolicyConflictsUid self otherClients = runExceptT $ do otherUids = nub $ Map.keys . userClients $ otherClients when (nub otherUids /= [self {- if all other clients belong to us, there can be no conflict -}]) $ do - allClients :: UserClientsFull <- lift $ Intra.lookupClientsFull (nub $ self : otherUids) + allClients :: UserClientsFull <- lift . liftSem $ lookupClientsFull (nub $ self : otherUids) let selfClients :: [Client.Client] = allClients @@ -126,11 +125,11 @@ guardLegalholdPolicyConflictsUid self otherClients = runExceptT $ do . Client.clientCapabilities checkConsentMissing :: Galley r Bool - checkConsentMissing = do + checkConsentMissing = liftSem $ do -- (we could also get the profile from brig. would make the code slightly more -- concise, but not really help with the rpc back-and-forth, so, like, why?) mbUser <- accountUser <$$> getUser self - mbTeamMember <- liftSem $ join <$> for (mbUser >>= userTeam) (`getTeamMember` self) + mbTeamMember <- join <$> for (mbUser >>= userTeam) (`getTeamMember` self) let lhStatus = maybe defUserLegalHoldStatus (view legalHoldStatus) mbTeamMember pure (lhStatus == UserLegalHoldNoConsent) diff --git a/services/galley/src/Galley/API/Message.hs b/services/galley/src/Galley/API/Message.hs index 4c61f56289..4bd82de30d 100644 --- a/services/galley/src/Galley/API/Message.hs +++ b/services/galley/src/Galley/API/Message.hs @@ -26,11 +26,13 @@ import Galley.API.Util import Galley.App import Galley.Data.Services as Data import Galley.Effects +import Galley.Effects.BrigAccess import Galley.Effects.ClientStore import Galley.Effects.ConversationStore +import Galley.Effects.ExternalAccess +import Galley.Effects.FederatorAccess +import Galley.Effects.GundeckAccess hiding (Push) import Galley.Effects.MemberStore -import qualified Galley.External as External -import qualified Galley.Intra.Client as Intra import Galley.Intra.Push import Galley.Options (optSettings, setIntraListing) import qualified Galley.Types.Clients as Clients @@ -41,7 +43,7 @@ import qualified System.Logger.Class as Log import Wire.API.Event.Conversation import qualified Wire.API.Federation.API.Brig as FederatedBrig import qualified Wire.API.Federation.API.Galley as FederatedGalley -import Wire.API.Federation.Client (FederationError, executeFederated) +import Wire.API.Federation.Client (FederationError) import Wire.API.Federation.Error (federationErrorToWai) import Wire.API.Message import Wire.API.Team.LegalHold @@ -182,8 +184,9 @@ getRemoteClients :: Galley r (Map (Domain, UserId) (Set ClientId)) getRemoteClients remoteMembers = -- concatenating maps is correct here, because their sets of keys are disjoint - mconcat . map tUnqualified - <$> runFederatedConcurrently (map rmId remoteMembers) getRemoteClientsFromDomain + liftSem $ + mconcat . map tUnqualified + <$> runFederatedConcurrently (map rmId remoteMembers) getRemoteClientsFromDomain where getRemoteClientsFromDomain (qUntagged -> Qualified uids domain) = Map.mapKeys (domain,) . fmap (Set.map pubClientId) . userMap @@ -192,18 +195,18 @@ getRemoteClients remoteMembers = postRemoteOtrMessage :: Members '[ConversationStore, FederatorAccess] r => Qualified UserId -> - Qualified ConvId -> + Remote ConvId -> LByteString -> Galley r (PostOtrResponse MessageSendingStatus) postRemoteOtrMessage sender conv rawMsg = do let msr = FederatedGalley.MessageSendRequest - { FederatedGalley.msrConvId = qUnqualified conv, + { FederatedGalley.msrConvId = tUnqualified conv, FederatedGalley.msrSender = qUnqualified sender, FederatedGalley.msrRawMessage = Base64ByteString rawMsg } rpc = FederatedGalley.sendMessage FederatedGalley.clientRoutes (qDomain sender) msr - FederatedGalley.msResponse <$> runFederatedGalley (qDomain conv) rpc + liftSem $ FederatedGalley.msResponse <$> runFederated conv rpc postQualifiedOtrMessage :: Members @@ -255,10 +258,10 @@ postQualifiedOtrMessage senderType sender mconn convId msg = runExceptT $ do -- get local clients localClients <- - lift $ + lift . liftSem $ if isInternal - then Clients.fromUserClients <$> Intra.lookupClients localMemberIds - else liftSem $ getClients localMemberIds + then Clients.fromUserClients <$> lookupClients localMemberIds + else getClients localMemberIds let qualifiedLocalClients = Map.mapKeys (localDomain,) . makeUserMap (Set.fromList (map lmId localMembers)) @@ -313,7 +316,7 @@ postQualifiedOtrMessage senderType sender mconn convId msg = runExceptT $ do -- | Send both local and remote messages, return the set of clients for which -- sending has failed. sendMessages :: - Members '[BotAccess, GundeckAccess, ExternalAccess] r => + Members '[BotAccess, GundeckAccess, ExternalAccess, FederatorAccess] r => UTCTime -> Qualified UserId -> ClientId -> @@ -330,7 +333,8 @@ sendMessages now sender senderClient mconn conv localMemberMap metadata messages | localDomain == dom = sendLocalMessages now sender senderClient mconn (Qualified conv localDomain) localMemberMap metadata | otherwise = - sendRemoteMessages dom now sender senderClient conv metadata + sendRemoteMessages (toRemoteUnsafe dom ()) now sender senderClient conv metadata + mkQualifiedUserClientsByDomain <$> Map.traverseWithKey send messageMap where byDomain :: Map (Domain, UserId, ClientId) a -> Map Domain (Map (UserId, ClientId) a) @@ -367,7 +371,9 @@ sendLocalMessages now sender senderClient mconn conv localMemberMap metadata loc pure mempty sendRemoteMessages :: - Domain -> + forall r x. + Member FederatorAccess r => + Remote x -> UTCTime -> Qualified UserId -> ClientId -> @@ -375,7 +381,7 @@ sendRemoteMessages :: MessageMetadata -> Map (UserId, ClientId) Text -> Galley r (Set (UserId, ClientId)) -sendRemoteMessages domain now sender senderClient conv metadata messages = handle <=< runExceptT $ do +sendRemoteMessages domain now sender senderClient conv metadata messages = (handle =<<) $ do let rcpts = foldr (\((u, c), t) -> Map.insertWith (<>) u (Map.singleton c t)) @@ -397,14 +403,14 @@ sendRemoteMessages domain now sender senderClient conv metadata messages = handl -- backend has only one domain so we just pick it from the environment. originDomain <- viewFederationDomain let rpc = FederatedGalley.onMessageSent FederatedGalley.clientRoutes originDomain rm - executeFederated domain rpc + liftSem $ runFederatedEither domain rpc where handle :: Either FederationError a -> Galley r (Set (UserId, ClientId)) handle (Right _) = pure mempty handle (Left e) = do Log.warn $ Log.field "conversation" (toByteString' conv) - Log.~~ Log.field "domain" (toByteString' domain) + Log.~~ Log.field "domain" (toByteString' (tDomain domain)) Log.~~ Log.field "exception" (encode (federationErrorToWai e)) Log.~~ Log.msg ("Remote message sending failed" :: Text) pure (Map.keysSet messages) @@ -444,7 +450,7 @@ runMessagePush :: MessagePush -> Galley r () runMessagePush cnv mp = do - pushSome (userPushes mp) + liftSem $ push (userPushes mp) pushToBots (botPushes mp) where pushToBots :: [(BotMember, Event)] -> Galley r () @@ -453,7 +459,7 @@ runMessagePush cnv mp = do if localDomain /= qDomain cnv then unless (null pushes) $ do Log.warn $ Log.msg ("Ignoring messages for local bots in a remote conversation" :: ByteString) . Log.field "conversation" (show cnv) - else External.deliverAndDeleteAsync (qUnqualified cnv) pushes + else liftSem $ deliverAndDeleteAsync (qUnqualified cnv) pushes newMessageEvent :: Qualified ConvId -> Qualified UserId -> ClientId -> Maybe Text -> UTCTime -> ClientId -> Text -> Event newMessageEvent convId sender senderClient dat time receiverClient cipherText = diff --git a/services/galley/src/Galley/API/Teams.hs b/services/galley/src/Galley/API/Teams.hs index 4b899c4e36..8c32af600c 100644 --- a/services/galley/src/Galley/API/Teams.hs +++ b/services/galley/src/Galley/API/Teams.hs @@ -94,18 +94,18 @@ import qualified Galley.Data.SearchVisibility as SearchVisibilityData import Galley.Data.Services (BotMember) import qualified Galley.Data.TeamFeatures as TeamFeatures import Galley.Effects +import qualified Galley.Effects.BrigAccess as E import qualified Galley.Effects.ConversationStore as E +import qualified Galley.Effects.ExternalAccess as E +import qualified Galley.Effects.GundeckAccess as E import qualified Galley.Effects.ListItems as E import qualified Galley.Effects.MemberStore as E import qualified Galley.Effects.Paging as E +import qualified Galley.Effects.SparAccess as Spar import qualified Galley.Effects.TeamMemberStore as E import qualified Galley.Effects.TeamStore as E -import qualified Galley.External as External import qualified Galley.Intra.Journal as Journal import Galley.Intra.Push -import qualified Galley.Intra.Spar as Spar -import qualified Galley.Intra.Team as BrigTeam -import Galley.Intra.User import Galley.Options import qualified Galley.Options as Opts import qualified Galley.Queue as Q @@ -269,7 +269,7 @@ updateTeamStatus tid (TeamStatusUpdate newStatus cur) = do -- When teams are created, they are activated immediately. In this situation, Brig will -- most likely report team size as 0 due to ES taking some time to index the team creator. -- This is also very difficult to test, so is not tested. - (TeamSize possiblyStaleSize) <- BrigTeam.getSize tid + (TeamSize possiblyStaleSize) <- liftSem $ E.getSize tid let size = if possiblyStaleSize == 0 then 1 @@ -313,7 +313,7 @@ updateTeam zusr zcon tid updateData = do memList <- getTeamMembersForFanout tid let e = newEvent TeamUpdate tid now & eventData .~ Just (EdTeamUpdate updateData) let r = list1 (userRecipient zusr) (membersToRecipients (Just zusr) (memList ^. teamMembers)) - push1 $ newPushLocal1 (memList ^. teamMemberListType) zusr (TeamEvent e) r & pushConn .~ Just zcon + liftSem . E.push1 $ newPushLocal1 (memList ^. teamMemberListType) zusr (TeamEvent e) r & pushConn .~ Just zcon deleteTeamH :: Members '[BrigAccess, TeamStore] r => @@ -379,7 +379,7 @@ uncheckedDeleteTeam :: uncheckedDeleteTeam zusr zcon tid = do team <- liftSem $ E.getTeam tid when (isJust team) $ do - Spar.deleteTeam tid + liftSem $ Spar.deleteTeam tid now <- liftIO getCurrentTime convs <- liftSem $ @@ -393,12 +393,12 @@ uncheckedDeleteTeam zusr zcon tid = do (ue, be) <- foldrM (createConvDeleteEvents now membs) ([], []) convs let e = newEvent TeamDelete tid now pushDeleteEvents membs e ue - External.deliverAsync be + liftSem $ E.deliverAsync be -- TODO: we don't delete bots here, but we should do that, since -- every bot user can only be in a single conversation. Just -- deleting conversations from the database is not enough. when ((view teamBinding . tdTeam <$> team) == Just Binding) $ do - mapM_ (deleteUser . view userId) membs + liftSem $ mapM_ (E.deleteUser . view userId) membs Journal.teamDelete tid Data.unsetTeamLegalholdWhitelisted tid liftSem $ E.deleteTeam tid @@ -410,16 +410,18 @@ uncheckedDeleteTeam zusr zcon tid = do -- To avoid DoS on gundeck, send team deletion events in chunks let chunkSize = fromMaybe defConcurrentDeletionEvents (o ^. setConcurrentDeletionEvents) let chunks = List.chunksOf chunkSize (toList r) - forM_ chunks $ \chunk -> case chunk of - [] -> return () - -- push TeamDelete events. Note that despite having a complete list, we are guaranteed in the - -- push module to never fan this out to more than the limit - x : xs -> push1 (newPushLocal1 ListComplete zusr (TeamEvent e) (list1 x xs) & pushConn .~ zcon) + liftSem $ + forM_ chunks $ \chunk -> case chunk of + [] -> return () + -- push TeamDelete events. Note that despite having a complete list, we are guaranteed in the + -- push module to never fan this out to more than the limit + x : xs -> E.push1 (newPushLocal1 ListComplete zusr (TeamEvent e) (list1 x xs) & pushConn .~ zcon) -- To avoid DoS on gundeck, send conversation deletion events slowly + -- FUTUREWORK: make this behaviour part of the GundeckAccess effect let delay = 1000 * (fromMaybe defDeleteConvThrottleMillis (o ^. setDeleteConvThrottleMillis)) forM_ ue $ \event -> do -- push ConversationDelete events - push1 event + liftSem $ E.push1 event threadDelay delay createConvDeleteEvents :: UTCTime -> @@ -509,8 +511,12 @@ getTeamMembersCSVH (zusr ::: tid ::: _) = do E.withChunks pager $ \members -> do inviters <- lookupInviterHandle members - users <- lookupUser <$> lookupActivatedUsers (fmap (view userId) members) - richInfos <- lookupRichInfo <$> getRichInfoMultiUser (fmap (view userId) members) + users <- + liftSem $ + lookupUser <$> E.lookupActivatedUsers (fmap (view userId) members) + richInfos <- + liftSem $ + lookupRichInfo <$> E.getRichInfoMultiUser (fmap (view userId) members) liftIO $ do writeString ( encodeDefaultOrderedByNameWith @@ -564,7 +570,7 @@ getTeamMembersCSVH (zusr ::: tid ::: _) = do let inviterIds :: [UserId] inviterIds = nub $ catMaybes $ fmap fst . view invitation <$> members - userList :: [User] <- accountUser <$$> getUsers inviterIds + userList :: [User] <- liftSem $ accountUser <$$> E.getUsers inviterIds let userMap :: M.Map UserId Handle.Handle userMap = M.fromList . catMaybes $ extract <$> userList @@ -710,7 +716,7 @@ addTeamMember zusr zcon tid nmem = do ensureNonBindingTeam tid ensureUnboundUsers [uid] ensureConnectedToLocals zusr [uid] - (TeamSize sizeBeforeJoin) <- BrigTeam.getSize tid + (TeamSize sizeBeforeJoin) <- liftSem $ E.getSize tid ensureNotTooLargeForLegalHold tid (fromIntegral sizeBeforeJoin + 1) memList <- getTeamMembersForFanout tid void $ addTeamMemberInternal tid (Just zusr) (Just zcon) nmem memList @@ -732,7 +738,7 @@ uncheckedAddTeamMember :: Galley r () uncheckedAddTeamMember tid nmem = do mems <- getTeamMembersForFanout tid - (TeamSize sizeBeforeJoin) <- BrigTeam.getSize tid + (TeamSize sizeBeforeJoin) <- liftSem $ E.getSize tid ensureNotTooLargeForLegalHold tid (fromIntegral sizeBeforeJoin + 1) (TeamSize sizeBeforeAdd) <- addTeamMemberInternal tid Nothing Nothing nmem mems billingUserIds <- Journal.getBillingUserIds tid $ Just $ newTeamMemberList ((nmem ^. ntmNewTeamMember) : mems ^. teamMembers) (mems ^. teamMemberListType) @@ -800,7 +806,7 @@ updateTeamMember zusr zcon tid targetMember = do updateJournal :: Team -> TeamMemberList -> Galley r () updateJournal team mems = do when (team ^. teamBinding == Binding) $ do - (TeamSize size) <- BrigTeam.getSize tid + (TeamSize size) <- liftSem $ E.getSize tid billingUserIds <- Journal.getBillingUserIds tid $ Just mems Journal.teamUpdate tid size billingUserIds @@ -816,7 +822,7 @@ updateTeamMember zusr zcon tid targetMember = do let ePriv = newEvent MemberUpdate tid now & eventData ?~ privilegedUpdate -- push to all members (user is privileged) let pushPriv = newPushLocal (updatedMembers ^. teamMemberListType) zusr (TeamEvent ePriv) $ privilegedRecipients - for_ pushPriv $ \p -> push1 $ p & pushConn .~ Just zcon + liftSem $ for_ pushPriv $ \p -> E.push1 $ p & pushConn .~ Just zcon deleteTeamMemberH :: Members @@ -874,7 +880,7 @@ deleteTeamMember zusr zcon tid remove mBody = do then do body <- mBody & ifNothing (invalidPayload "missing request body") ensureReAuthorised zusr (body ^. tmdAuthPassword) - (TeamSize sizeBeforeDelete) <- BrigTeam.getSize tid + (TeamSize sizeBeforeDelete) <- liftSem $ E.getSize tid -- TeamSize is 'Natural' and subtracting from 0 is an error -- TeamSize could be reported as 0 if team members are added and removed very quickly, -- which happens in tests @@ -882,7 +888,7 @@ deleteTeamMember zusr zcon tid remove mBody = do if sizeBeforeDelete == 0 then 0 else sizeBeforeDelete - 1 - deleteUser remove + liftSem $ E.deleteUser remove billingUsers <- Journal.getBillingUserIds tid (Just mems) Journal.teamUpdate tid sizeAfterDelete $ filter (/= remove) billingUsers pure TeamMemberDeleteAccepted @@ -919,7 +925,8 @@ uncheckedDeleteTeamMember zusr zcon tid remove mems = do pushMemberLeaveEvent now = do let e = newEvent MemberLeave tid now & eventData ?~ EdMemberLeave remove let r = list1 (userRecipient zusr) (membersToRecipients (Just zusr) (mems ^. teamMembers)) - push1 $ newPushLocal1 (mems ^. teamMemberListType) zusr (TeamEvent e) r & pushConn .~ zcon + liftSem . E.push1 $ + newPushLocal1 (mems ^. teamMemberListType) zusr (TeamEvent e) r & pushConn .~ zcon -- notify all conversation members not in this team. removeFromConvsAndPushConvLeaveEvent :: UTCTime -> Galley r () removeFromConvsAndPushConvLeaveEvent now = do @@ -946,8 +953,8 @@ uncheckedDeleteTeamMember zusr zcon tid remove mems = do let x = filter (\m -> not (Conv.lmId m `Set.member` exceptTo)) users let y = Conv.Event Conv.MemberLeave qconvId qusr now edata for_ (newPushLocal (mems ^. teamMemberListType) zusr (ConvEvent y) (recipient <$> x)) $ \p -> - push1 $ p & pushConn .~ zcon - External.deliverAsync (bots `zip` repeat y) + liftSem . E.push1 $ p & pushConn .~ zcon + liftSem $ E.deliverAsync (bots `zip` repeat y) getTeamConversations :: Member TeamStore r => @@ -1081,7 +1088,7 @@ ensureNotElevated targetPermissions member = ensureNotTooLarge :: Member BrigAccess r => TeamId -> Galley r TeamSize ensureNotTooLarge tid = do o <- view options - (TeamSize size) <- BrigTeam.getSize tid + (TeamSize size) <- liftSem $ E.getSize tid unless (size < fromIntegral (o ^. optSettings . setMaxTeamSize)) $ throwM tooManyTeamMembers return $ TeamSize size @@ -1103,7 +1110,7 @@ ensureNotTooLargeForLegalHold tid teamSize = do ensureNotTooLargeToActivateLegalHold :: Members '[BrigAccess] r => TeamId -> Galley r () ensureNotTooLargeToActivateLegalHold tid = do - (TeamSize teamSize) <- BrigTeam.getSize tid + (TeamSize teamSize) <- liftSem $ E.getSize tid unlessM (teamSizeBelowLimit (fromIntegral teamSize)) $ do throwM cannotEnableLegalHoldServiceLargeTeam @@ -1139,7 +1146,8 @@ addTeamMemberInternal tid origin originConn (view ntmNewTeamMember -> new) memLi luid <- qualifyLocal (new ^. userId) liftSem $ E.createMember lcid luid let e = newEvent MemberJoin tid now & eventData ?~ EdMemberJoin (new ^. userId) - push1 $ newPushLocal1 (memList ^. teamMemberListType) (new ^. userId) (TeamEvent e) (recipients origin new) & pushConn .~ originConn + liftSem . E.push1 $ + newPushLocal1 (memList ^. teamMemberListType) (new ^. userId) (TeamEvent e) (recipients origin new) & pushConn .~ originConn APITeamQueue.pushTeamEvent tid e return sizeBeforeAdd where @@ -1195,7 +1203,7 @@ finishCreateTeam team owner others zcon = do now <- liftIO getCurrentTime let e = newEvent TeamCreate (team ^. teamId) now & eventData ?~ EdTeamCreate team let r = membersToRecipients Nothing others - push1 $ newPushLocal1 ListComplete zusr (TeamEvent e) (list1 (userRecipient zusr) r) & pushConn .~ zcon + liftSem . E.push1 $ newPushLocal1 ListComplete zusr (TeamEvent e) (list1 (userRecipient zusr) r) & pushConn .~ zcon withBindingTeam :: Member TeamStore r => UserId -> (TeamId -> Galley r b) -> Galley r b withBindingTeam zusr callback = do @@ -1226,7 +1234,7 @@ canUserJoinTeam :: Member BrigAccess r => TeamId -> Galley r () canUserJoinTeam tid = do lhEnabled <- isLegalHoldEnabledForTeam tid when lhEnabled $ do - (TeamSize sizeBeforeJoin) <- BrigTeam.getSize tid + (TeamSize sizeBeforeJoin) <- liftSem $ E.getSize tid ensureNotTooLargeForLegalHold tid (fromIntegral sizeBeforeJoin + 1) getTeamSearchVisibilityAvailableInternal :: TeamId -> Galley r (Public.TeamFeatureStatus 'Public.TeamFeatureSearchVisibility) diff --git a/services/galley/src/Galley/API/Teams/Features.hs b/services/galley/src/Galley/API/Teams/Features.hs index 8122a7059d..f3dcd40448 100644 --- a/services/galley/src/Galley/API/Teams/Features.hs +++ b/services/galley/src/Galley/API/Teams/Features.hs @@ -63,9 +63,10 @@ import Galley.Cassandra.Paging import qualified Galley.Data.SearchVisibility as SearchVisibilityData import qualified Galley.Data.TeamFeatures as TeamFeatures import Galley.Effects +import Galley.Effects.GundeckAccess import Galley.Effects.Paging import Galley.Effects.TeamStore -import Galley.Intra.Push (PushEvent (FeatureConfigEvent), newPush, push1) +import Galley.Intra.Push (PushEvent (FeatureConfigEvent), newPush) import Galley.Options import Galley.Types.Teams hiding (newTeam) import Imports @@ -459,7 +460,7 @@ pushFeatureConfigEvent tid event = do let recipients = membersToRecipients Nothing (memList ^. teamMembers) for_ (newPush (memList ^. teamMemberListType) Nothing (FeatureConfigEvent event) recipients) - push1 + (liftSem . push1) -- | (Currently, we only have 'Public.TeamFeatureConferenceCalling' here, but we may have to -- extend this in the future.) diff --git a/services/galley/src/Galley/API/Teams/Notifications.hs b/services/galley/src/Galley/API/Teams/Notifications.hs index 08edac5eb2..d37e42a64b 100644 --- a/services/galley/src/Galley/API/Teams/Notifications.hs +++ b/services/galley/src/Galley/API/Teams/Notifications.hs @@ -52,7 +52,7 @@ import Galley.API.Error import Galley.App import qualified Galley.Data.TeamNotifications as DataTeamQueue import Galley.Effects -import Galley.Intra.User as Intra +import Galley.Effects.BrigAccess as Intra import Galley.Types.Teams hiding (newTeam) import Gundeck.Types.Notification import Imports @@ -67,7 +67,7 @@ getTeamNotifications :: Galley r QueuedNotificationList getTeamNotifications zusr since size = do tid :: TeamId <- do - mtid <- (userTeam . accountUser =<<) <$> Intra.getUser zusr + mtid <- liftSem $ (userTeam . accountUser =<<) <$> Intra.getUser zusr let err = throwM teamNotFound maybe err pure mtid page <- DataTeamQueue.fetch tid since size diff --git a/services/galley/src/Galley/API/Update.hs b/services/galley/src/Galley/API/Update.hs index be63e45d5e..43190d9d83 100644 --- a/services/galley/src/Galley/API/Update.hs +++ b/services/galley/src/Galley/API/Update.hs @@ -98,15 +98,17 @@ import qualified Galley.Data.Conversation as Data import Galley.Data.Services as Data import Galley.Data.Types hiding (Conversation) import Galley.Effects +import qualified Galley.Effects.BotAccess as E +import qualified Galley.Effects.BrigAccess as E import qualified Galley.Effects.ClientStore as E import qualified Galley.Effects.CodeStore as E import qualified Galley.Effects.ConversationStore as E +import qualified Galley.Effects.ExternalAccess as E +import qualified Galley.Effects.FederatorAccess as E +import qualified Galley.Effects.GundeckAccess as E import qualified Galley.Effects.MemberStore as E import qualified Galley.Effects.TeamStore as E -import qualified Galley.External as External -import qualified Galley.Intra.Client as Intra import Galley.Intra.Push -import Galley.Intra.User (deleteBot, getContactList, lookupActivatedUsers) import Galley.Options import Galley.Types import Galley.Types.Bot hiding (addBot) @@ -301,16 +303,16 @@ performAccessUpdateAction qusr conv target = do liftSem $ E.deleteCode key ReusableCode -- Determine bots and members to be removed - let filterBotsAndMembers = filterActivated >=> (liftSem . filterTeammates) + let filterBotsAndMembers = filterActivated >=> filterTeammates let current = convBotsAndMembers conv -- initial bots and members - desired <- lift $ filterBotsAndMembers current -- desired bots and members + desired <- lift . liftSem $ filterBotsAndMembers current -- desired bots and members let toRemove = bmDiff current desired -- bots and members to be removed -- Update Cassandra lift . liftSem $ E.setConversationAccess (tUnqualified lcnv) target lift . fireAndForget $ do -- Remove bots - traverse_ (deleteBot (tUnqualified lcnv)) (map botMemId (toList (bmBots toRemove))) + traverse_ (liftSem . E.deleteBot (tUnqualified lcnv) . botMemId) (bmBots toRemove) -- Update current bots and members let current' = current {bmBots = bmBots desired} @@ -321,12 +323,12 @@ performAccessUpdateAction qusr conv target = do void . runMaybeT $ performAction qusr conv action notifyConversationMetadataUpdate qusr Nothing lcnv current' action where - filterActivated :: BotsAndMembers -> Galley r BotsAndMembers + filterActivated :: BotsAndMembers -> Sem r BotsAndMembers filterActivated bm | ( Data.convAccessRole conv > ActivatedAccessRole && cupAccessRole target <= ActivatedAccessRole ) = do - activated <- map User.userId <$> lookupActivatedUsers (toList (bmLocals bm)) + activated <- map User.userId <$> E.lookupActivatedUsers (toList (bmLocals bm)) -- FUTUREWORK: should we also remove non-activated remote users? pure $ bm {bmLocals = Set.fromList activated} | otherwise = pure bm @@ -1028,10 +1030,10 @@ removeMemberFromRemoteConv :: Maybe ConnId -> Qualified UserId -> Galley r RemoveFromConversationResponse -removeMemberFromRemoteConv (qUntagged -> qcnv) lusr _ victim +removeMemberFromRemoteConv cnv lusr _ victim | qUntagged lusr == victim = do - let lc = FederatedGalley.LeaveConversationRequest (qUnqualified qcnv) (qUnqualified victim) + let lc = FederatedGalley.LeaveConversationRequest (tUnqualified cnv) (qUnqualified victim) let rpc = FederatedGalley.leaveConversation FederatedGalley.clientRoutes @@ -1039,9 +1041,11 @@ removeMemberFromRemoteConv (qUntagged -> qcnv) lusr _ victim lc t <- liftIO getCurrentTime let successEvent = - Event MemberLeave qcnv (qUntagged lusr) t $ + Event MemberLeave (qUntagged cnv) (qUntagged lusr) t $ EdMembersLeave (QualifiedUserIdList [victim]) - mapRight (const successEvent) . FederatedGalley.leaveResponse <$> runFederated (qDomain qcnv) rpc + liftSem $ + mapRight (const successEvent) . FederatedGalley.leaveResponse + <$> E.runFederated cnv rpc | otherwise = pure . Left $ RemoveFromConversationErrorRemovalNotAllowed performRemoveMemberAction :: @@ -1147,11 +1151,12 @@ postProteusMessage :: RawProto Public.QualifiedNewOtrMessage -> Galley r (Public.PostOtrResponse Public.MessageSendingStatus) postProteusMessage zusr zcon conv msg = do - localDomain <- viewFederationDomain - let sender = Qualified zusr localDomain - if localDomain /= qDomain conv - then postRemoteOtrMessage sender conv (rpRaw msg) - else postQualifiedOtrMessage User sender (Just zcon) (qUnqualified conv) (rpValue msg) + sender <- qualifyLocal zusr + foldQualified + sender + (\c -> postQualifiedOtrMessage User (qUntagged sender) (Just zcon) (tUnqualified c) (rpValue msg)) + (\c -> postRemoteOtrMessage (qUntagged sender) c (rpRaw msg)) + conv postOtrMessageUnqualified :: Members @@ -1252,7 +1257,7 @@ postNewOtrBroadcast usr con val msg = do now <- liftIO getCurrentTime withValidOtrBroadcastRecipients usr sender recvrs val now $ \rs -> do let (_, toUsers) = foldr (newMessage qusr con Nothing msg now) ([], []) rs - pushSome (catMaybes toUsers) + liftSem $ E.push (catMaybes toUsers) postNewOtrMessage :: Members @@ -1280,10 +1285,10 @@ postNewOtrMessage utype usr con cnv val msg = do sender = newOtrSender msg recvrs = newOtrRecipients msg now <- liftIO getCurrentTime - withValidOtrRecipients utype usr sender cnv recvrs val now $ \rs -> do + withValidOtrRecipients utype usr sender cnv recvrs val now $ \rs -> liftSem $ do let (toBots, toUsers) = foldr (newMessage qusr con (Just qcnv) msg now) ([], []) rs - pushSome (catMaybes toUsers) - External.deliverAndDeleteAsync cnv toBots + E.push (catMaybes toUsers) + E.deliverAndDeleteAsync cnv toBots newMessage :: Qualified UserId -> @@ -1387,9 +1392,10 @@ notifyConversationMetadataUpdate quid con (qUntagged -> qcnv) targets action = d let e = conversationActionToEvent now quid qcnv action -- notify remote participants - runFederatedConcurrently_ (toList (bmRemotes targets)) $ \ruids -> - FederatedGalley.onConversationUpdated FederatedGalley.clientRoutes localDomain $ - FederatedGalley.ConversationUpdate now quid (qUnqualified qcnv) (tUnqualified ruids) action + liftSem $ + E.runFederatedConcurrently_ (toList (bmRemotes targets)) $ \ruids -> + FederatedGalley.onConversationUpdated FederatedGalley.clientRoutes localDomain $ + FederatedGalley.ConversationUpdate now quid (qUnqualified qcnv) (tUnqualified ruids) action -- notify local participants and bots pushConversationEvent con e (bmLocals targets) (bmBots targets) $> e @@ -1420,7 +1426,7 @@ isTyping zusr zcon cnv typingData = do now <- liftIO getCurrentTime let e = Event Typing qcnv qusr now (EdTyping typingData) for_ (newPushLocal ListComplete zusr (ConvEvent e) (recipient <$> mm)) $ \p -> - push1 $ + liftSem . E.push1 $ p & pushConn ?~ zcon & pushRoute .~ RouteDirect @@ -1493,8 +1499,8 @@ addBot zusr zcon b = do ) ) for_ (newPushLocal ListComplete zusr (ConvEvent e) (recipient <$> users)) $ \p -> - push1 $ p & pushConn ?~ zcon - External.deliverAsync ((bm : bots) `zip` repeat e) + liftSem . E.push1 $ p & pushConn ?~ zcon + liftSem $ E.deliverAsync ((bm : bots) `zip` repeat e) pure e where regularConvChecks lusr c = do @@ -1541,14 +1547,15 @@ rmBot zusr zcon b = do then pure Unchanged else do t <- liftIO getCurrentTime - let evd = EdMembersLeave (QualifiedUserIdList [Qualified (botUserId (b ^. rmBotId)) localDomain]) - let e = Event MemberLeave qcnv qusr t evd - for_ (newPushLocal ListComplete zusr (ConvEvent e) (recipient <$> users)) $ \p -> - push1 $ p & pushConn .~ zcon - liftSem $ E.deleteMembers (Data.convId c) (UserList [botUserId (b ^. rmBotId)] []) - liftSem $ E.deleteClients (botUserId (b ^. rmBotId)) - External.deliverAsync (bots `zip` repeat e) - pure $ Updated e + liftSem $ do + let evd = EdMembersLeave (QualifiedUserIdList [Qualified (botUserId (b ^. rmBotId)) localDomain]) + let e = Event MemberLeave qcnv qusr t evd + for_ (newPushLocal ListComplete zusr (ConvEvent e) (recipient <$> users)) $ \p -> + E.push1 $ p & pushConn .~ zcon + E.deleteMembers (Data.convId c) (UserList [botUserId (b ^. rmBotId)] []) + E.deleteClients (botUserId (b ^. rmBotId)) + E.deliverAsync (bots `zip` repeat e) + pure $ Updated e ------------------------------------------------------------------------------- -- Helpers @@ -1601,13 +1608,14 @@ withValidOtrBroadcastRecipients usr clt rcps val now go = withBindingTeam usr $ fmap (view userId) <$> case val of OtrReportMissing us -> maybeFetchLimitedTeamMemberList limit tid us _ -> maybeFetchAllMembersInTeam tid - contacts <- getContactList usr + contacts <- liftSem $ E.getContactList usr let users = Set.toList $ Set.union (Set.fromList tMembers) (Set.fromList contacts) isInternal <- view $ options . optSettings . setIntraListing clts <- - if isInternal - then Clients.fromUserClients <$> Intra.lookupClients users - else liftSem $ E.getClients users + liftSem $ + if isInternal + then Clients.fromUserClients <$> E.lookupClients users + else E.getClients users let membs = newMember <$> users handleOtrResponse User usr clt rcps membs clts val now go where @@ -1647,9 +1655,10 @@ withValidOtrRecipients utype usr clt cnv rcps val now go = do let localMemberIds = lmId <$> localMembers isInternal <- view $ options . optSettings . setIntraListing clts <- - if isInternal - then Clients.fromUserClients <$> Intra.lookupClients localMemberIds - else liftSem $ E.getClients localMemberIds + liftSem $ + if isInternal + then Clients.fromUserClients <$> E.lookupClients localMemberIds + else E.getClients localMemberIds handleOtrResponse utype usr clt rcps localMembers clts val now go handleOtrResponse :: diff --git a/services/galley/src/Galley/API/Util.hs b/services/galley/src/Galley/API/Util.hs index a95e26c045..17c8a47415 100644 --- a/services/galley/src/Galley/API/Util.hs +++ b/services/galley/src/Galley/API/Util.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE RecordWildCards #-} -- This file is part of the Wire Server implementation. @@ -45,13 +46,15 @@ import Galley.Data.LegalHold (isTeamLegalholdWhitelisted) import Galley.Data.Services (BotMember, newBotMember) import qualified Galley.Data.Types as DataTypes import Galley.Effects +import Galley.Effects.BrigAccess import Galley.Effects.CodeStore import Galley.Effects.ConversationStore +import Galley.Effects.ExternalAccess +import Galley.Effects.FederatorAccess +import Galley.Effects.GundeckAccess import Galley.Effects.MemberStore import Galley.Effects.TeamStore -import qualified Galley.External as External import Galley.Intra.Push -import Galley.Intra.User import Galley.Options (optSettings, setFeatureFlags, setFederationDomain) import Galley.Types import Galley.Types.Conversations.Members (localMemberToOther, remoteMemberToOther) @@ -63,16 +66,11 @@ import Network.HTTP.Types import Network.Wai import Network.Wai.Predicate hiding (Error) import Network.Wai.Utilities -import UnliftIO.Async (concurrently, pooledForConcurrentlyN) import qualified Wire.API.Conversation as Public import Wire.API.Conversation.Action (ConversationAction (..), conversationActionTag) import Wire.API.ErrorDescription -import qualified Wire.API.Federation.API.Brig as FederatedBrig import Wire.API.Federation.API.Galley as FederatedGalley -import Wire.API.Federation.Client (FederationClientFailure, FederatorClient, executeFederated) -import Wire.API.Federation.Error (federationErrorToWai, federationNotImplemented) -import Wire.API.Federation.GRPC.Types (Component (..)) -import qualified Wire.API.User as User +import Wire.API.Federation.Error (federationNotImplemented) type JSON = Media "application" "json" @@ -83,7 +81,7 @@ ensureAccessRole role users = case role of when (any (isNothing . snd) users) $ throwErrorDescriptionType @NotATeamMember ActivatedAccessRole -> do - activated <- lookupActivatedUsers $ map fst users + activated <- liftSem $ lookupActivatedUsers $ map fst users when (length activated /= length users) $ throwErrorDescriptionType @ConvAccessDenied NonActivatedAccessRole -> return () @@ -122,23 +120,25 @@ ensureConnected self others = do ensureConnectedToLocals :: Member BrigAccess r => UserId -> [UserId] -> Galley r () ensureConnectedToLocals _ [] = pure () -ensureConnectedToLocals u uids = liftGalley0 $ do +ensureConnectedToLocals u uids = do (connsFrom, connsTo) <- - getConnectionsUnqualified0 [u] (Just uids) (Just Accepted) - `concurrently` getConnectionsUnqualified0 uids (Just [u]) (Just Accepted) + liftSem $ + getConnectionsUnqualifiedBidi [u] uids (Just Accepted) (Just Accepted) unless (length connsFrom == length uids && length connsTo == length uids) $ throwErrorDescriptionType @NotConnected ensureConnectedToRemotes :: Member BrigAccess r => Local UserId -> [Remote UserId] -> Galley r () ensureConnectedToRemotes _ [] = pure () ensureConnectedToRemotes u remotes = do - acceptedConns <- getConnections [tUnqualified u] (Just $ map qUntagged remotes) (Just Accepted) + acceptedConns <- + liftSem $ + getConnections [tUnqualified u] (Just $ map qUntagged remotes) (Just Accepted) when (length acceptedConns /= length remotes) $ throwErrorDescriptionType @NotConnected ensureReAuthorised :: Member BrigAccess r => UserId -> Maybe PlainTextPassword -> Galley r () ensureReAuthorised u secret = do - reAuthed <- reAuthUser u (ReAuthUser secret) + reAuthed <- liftSem $ reauthUser u (ReAuthUser secret) unless reAuthed $ throwM reAuthFailed @@ -296,7 +296,7 @@ acceptOne2One usr conv conn = do conv' <- if isJust (find ((usr /=) . lmId) mems) then liftSem promote else pure conv let mems' = mems <> toList mm for_ (newPushLocal ListComplete usr (ConvEvent e) (recipient <$> mems')) $ \p -> - push1 $ p & pushConn .~ conn & pushRoute .~ RouteDirect + liftSem $ push1 $ p & pushConn .~ conn & pushRoute .~ RouteDirect return $ conv' {Data.convLocalMembers = mems'} _ -> throwM $ invalidOp "accept: invalid conversation type" where @@ -582,8 +582,8 @@ pushConversationEvent :: pushConversationEvent conn e users bots = do localDomain <- viewFederationDomain for_ (newConversationEventPush localDomain e (toList users)) $ \p -> - push1 $ p & set pushConn conn - External.deliverAsync (toList bots `zip` repeat e) + liftSem $ push1 $ p & set pushConn conn + liftSem $ deliverAsync (toList bots `zip` repeat e) verifyReusableCode :: Member CodeStore r => @@ -628,76 +628,6 @@ viewFederationDomain = view (options . optSettings . setFederationDomain) qualifyLocal :: MonadReader Env m => a -> m (Local a) qualifyLocal a = toLocalUnsafe <$> viewFederationDomain <*> pure a -checkRemoteUsersExist :: - (Member FederatorAccess r, Functor f, Foldable f) => - f (Remote UserId) -> - Galley r () -checkRemoteUsersExist = - -- FUTUREWORK: pooledForConcurrentlyN_ instead of sequential checks per domain - traverse_ checkRemotesFor . bucketRemote - -checkRemotesFor :: Member FederatorAccess r => Remote [UserId] -> Galley r () -checkRemotesFor (qUntagged -> Qualified uids domain) = do - let rpc = FederatedBrig.getUsersByIds FederatedBrig.clientRoutes uids - users <- runFederatedBrig domain rpc - let uids' = - map - (qUnqualified . User.profileQualifiedId) - (filter (not . User.profileDeleted) users) - unless (Set.fromList uids == Set.fromList uids') $ - throwM unknownRemoteUser - -type FederatedGalleyRPC c a = FederatorClient c (ExceptT FederationClientFailure Galley0) a - -runFederated0 :: - forall (c :: Component) a. - Domain -> - FederatedGalleyRPC c a -> - Galley0 a -runFederated0 remoteDomain rpc = do - runExceptT (executeFederated remoteDomain rpc) - >>= either (throwM . federationErrorToWai) pure - -runFederatedGalley :: - Member FederatorAccess r => - Domain -> - FederatedGalleyRPC 'Galley a -> - Galley r a -runFederatedGalley = runFederated - -runFederatedBrig :: - Member FederatorAccess r => - Domain -> - FederatedGalleyRPC 'Brig a -> - Galley r a -runFederatedBrig = runFederated - -runFederated :: - forall (c :: Component) r a. - Member FederatorAccess r => - Domain -> - FederatedGalleyRPC c a -> - Galley r a -runFederated remoteDomain = liftGalley0 . runFederated0 remoteDomain - -runFederatedConcurrently :: - Member FederatorAccess r => - (Foldable f, Functor f) => - f (Remote a) -> - (Remote [a] -> FederatedGalleyRPC c b) -> - Galley r [Remote b] -runFederatedConcurrently xs rpc = liftGalley0 $ - pooledForConcurrentlyN 8 (bucketRemote xs) $ \r -> - qualifyAs r <$> runFederated0 (tDomain r) (rpc r) - -runFederatedConcurrently_ :: - Member FederatorAccess r => - (Foldable f, Functor f) => - f (Remote a) -> - (Remote [a] -> FederatedGalleyRPC c ()) -> - Galley r () -runFederatedConcurrently_ xs = void . runFederatedConcurrently xs - -- | Convert an internal conversation representation 'Data.Conversation' to -- 'NewRemoteConversation' to be sent over the wire to a remote backend that will -- reconstruct this into multiple public-facing @@ -807,7 +737,7 @@ registerRemoteConversationMemberships :: Domain -> Data.Conversation -> Galley r () -registerRemoteConversationMemberships now localDomain c = do +registerRemoteConversationMemberships now localDomain c = liftSem $ do let allRemoteMembers = nubOrd (map rmId (Data.convRemoteMembers c)) rc = toNewRemoteConversation now localDomain c runFederatedConcurrently_ allRemoteMembers $ \_ -> diff --git a/services/galley/src/Galley/App.hs b/services/galley/src/Galley/App.hs index ad7436253e..cc0dfcd04b 100644 --- a/services/galley/src/Galley/App.hs +++ b/services/galley/src/Galley/App.hs @@ -50,7 +50,6 @@ module Galley.App fromJsonBody, fromOptionalJsonBody, fromProtoBody, - initExtEnv, fanoutLimit, currentFanoutLimit, @@ -95,8 +94,12 @@ import Galley.Cassandra.ConversationList import Galley.Cassandra.Services import Galley.Cassandra.Team import Galley.Effects +import Galley.Effects.FireAndForget (interpretFireAndForget) import qualified Galley.Effects.FireAndForget as E import Galley.Env +import Galley.External +import Galley.Intra.Effects +import Galley.Intra.Federator import Galley.Options import qualified Galley.Queue as Q import qualified Galley.Types.Teams as Teams @@ -110,7 +113,6 @@ import Network.Wai import Network.Wai.Utilities hiding (Error) import qualified Network.Wai.Utilities as WaiError import qualified Network.Wai.Utilities.Server as Server -import OpenSSL.EVP.Digest (getDigestByName) import OpenSSL.Session as Ssl import qualified OpenSSL.X509.SystemStore as Ssl import Polysemy @@ -169,12 +171,6 @@ instance HasFederatorConfig (Galley r) where fanoutLimit :: Galley r (Range 1 Teams.HardTruncationLimit Int32) fanoutLimit = view options >>= return . currentFanoutLimit -currentFanoutLimit :: Opts -> Range 1 Teams.HardTruncationLimit Int32 -currentFanoutLimit o = do - let optFanoutLimit = fromIntegral . fromRange $ fromMaybe defFanoutLimit (o ^. optSettings ^. setMaxFanoutSize) - let maxTeamSize = fromIntegral (o ^. optSettings ^. setMaxTeamSize) - unsafeRange (min maxTeamSize optFanoutLimit) - -- Define some invariants for the options used validateOptions :: Logger -> Opts -> IO () validateOptions l o = do @@ -256,29 +252,6 @@ initHttpManager o = do managerIdleConnectionCount = 3 * (o ^. optSettings . setHttpPoolSize) } --- TODO: somewhat duplicates Brig.App.initExtGetManager -initExtEnv :: IO ExtEnv -initExtEnv = do - ctx <- Ssl.context - Ssl.contextSetVerificationMode ctx Ssl.VerifyNone - Ssl.contextAddOption ctx SSL_OP_NO_SSLv2 - Ssl.contextAddOption ctx SSL_OP_NO_SSLv3 - Ssl.contextAddOption ctx SSL_OP_NO_TLSv1 - Ssl.contextSetCiphers ctx rsaCiphers - Ssl.contextLoadSystemCerts ctx - mgr <- - newManager - (opensslManagerSettings (pure ctx)) - { managerResponseTimeout = responseTimeoutMicro 10000000, - managerConnCount = 100 - } - Just sha <- getDigestByName "SHA256" - return $ ExtEnv (mgr, mkVerify sha) - where - mkVerify sha fprs = - let pinset = map toByteString' fprs - in verifyRsaFingerprint sha pinset - runGalley :: Env -> Request -> Galley GalleyEffects a -> IO a runGalley e r m = let e' = reqId .~ lookupReqId r $ e @@ -306,10 +279,6 @@ evalGalley e = evalGalley0 e . unGalley . interpretGalleyToGalley0 lookupReqId :: Request -> RequestId lookupReqId = maybe def RequestId . lookup requestIdName . requestHeaders -reqIdMsg :: RequestId -> Logger.Msg -> Logger.Msg -reqIdMsg = ("request" .=) . unRequestId -{-# INLINE reqIdMsg #-} - fromJsonBody :: FromJSON a => JsonRequest a -> Galley r a fromJsonBody r = exceptT (throwM . invalidPayload) return (parseBody r) {-# INLINE fromJsonBody #-} @@ -370,13 +339,12 @@ interpretGalleyToGalley0 = . interpretCodeStoreToCassandra . interpretClientStoreToCassandra . interpretFireAndForget - . interpretIntra - . interpretBot - . interpretFederator - . interpretExternal - . interpretSpar - . interpretGundeck - . interpretBrig + . interpretBotAccess + . interpretFederatorAccess + . interpretExternalAccess + . interpretSparAccess + . interpretGundeckAccess + . interpretBrigAccess . unGalley ---------------------------------------------------------------------------------- diff --git a/services/galley/src/Galley/Data/LegalHold.hs b/services/galley/src/Galley/Data/LegalHold.hs index 62171d21ed..052bc06421 100644 --- a/services/galley/src/Galley/Data/LegalHold.hs +++ b/services/galley/src/Galley/Data/LegalHold.hs @@ -36,10 +36,10 @@ import Cassandra import Control.Lens (unsnoc, view) import Data.Id import Data.LegalHold -import Galley.App (Env, options) import qualified Galley.Cassandra.LegalHold as C import Galley.Data.Instances () import Galley.Data.Queries as Q +import Galley.Env import qualified Galley.Options as Opts import Galley.Types.Teams (flagLegalHold) import Imports diff --git a/services/galley/src/Galley/Effects.hs b/services/galley/src/Galley/Effects.hs index 4830b635e7..4058b1ea44 100644 --- a/services/galley/src/Galley/Effects.hs +++ b/services/galley/src/Galley/Effects.hs @@ -19,37 +19,18 @@ module Galley.Effects ( -- * Effects needed in Galley GalleyEffects1, - -- * Internal services - Intra, - interpretIntra, - - -- * Brig + -- * Effects to access the Intra API + BotAccess, BrigAccess, - interpretBrig, - - -- * Federator FederatorAccess, - interpretFederator, - - -- * Spar - SparAccess, - interpretSpar, - - -- * Gundeck GundeckAccess, - interpretGundeck, + SparAccess, -- * External services ExternalAccess, - interpretExternal, - - -- * Bot API - BotAccess, - interpretBot, -- * Fire-and-forget async FireAndForget, - interpretFireAndForget, -- * Store effects ClientStore, @@ -72,53 +53,23 @@ where import Data.Id import Data.Qualified import Galley.Cassandra.Paging +import Galley.Effects.BotAccess +import Galley.Effects.BrigAccess import Galley.Effects.ClientStore import Galley.Effects.CodeStore import Galley.Effects.ConversationStore +import Galley.Effects.ExternalAccess +import Galley.Effects.FederatorAccess import Galley.Effects.FireAndForget +import Galley.Effects.GundeckAccess import Galley.Effects.ListItems import Galley.Effects.MemberStore import Galley.Effects.ServiceStore +import Galley.Effects.SparAccess import Galley.Effects.TeamMemberStore import Galley.Effects.TeamStore -import Imports import Polysemy -data Intra m a - -interpretIntra :: Sem (Intra ': r) a -> Sem r a -interpretIntra = interpret $ \case - -data BrigAccess m a - -interpretBrig :: Sem (BrigAccess ': r) a -> Sem r a -interpretBrig = interpret $ \case - -data GundeckAccess m a - -interpretGundeck :: Sem (GundeckAccess ': r) a -> Sem r a -interpretGundeck = interpret $ \case - -data ExternalAccess m a - -interpretExternal :: Sem (ExternalAccess ': r) a -> Sem r a -interpretExternal = interpret $ \case - -data FederatorAccess m a - -interpretFederator :: Sem (FederatorAccess ': r) a -> Sem r a -interpretFederator = interpret $ \case - -data SparAccess m a - -interpretSpar :: Sem (SparAccess ': r) a -> Sem r a -interpretSpar = interpret $ \case - -data BotAccess m a - -interpretBot :: Sem (BotAccess ': r) a -> Sem r a -interpretBot = interpret $ \case - -- All the possible high-level effects. type GalleyEffects1 = '[ BrigAccess, @@ -127,7 +78,6 @@ type GalleyEffects1 = ExternalAccess, FederatorAccess, BotAccess, - Intra, FireAndForget, ClientStore, CodeStore, diff --git a/services/galley/src/Galley/Effects/BotAccess.hs b/services/galley/src/Galley/Effects/BotAccess.hs new file mode 100644 index 0000000000..819fde4908 --- /dev/null +++ b/services/galley/src/Galley/Effects/BotAccess.hs @@ -0,0 +1,26 @@ +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2021 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Galley.Effects.BotAccess where + +import Data.Id +import Polysemy + +data BotAccess m a where + DeleteBot :: ConvId -> BotId -> BotAccess m () + +makeSem ''BotAccess diff --git a/services/galley/src/Galley/Effects/BrigAccess.hs b/services/galley/src/Galley/Effects/BrigAccess.hs new file mode 100644 index 0000000000..5741e3b8b1 --- /dev/null +++ b/services/galley/src/Galley/Effects/BrigAccess.hs @@ -0,0 +1,113 @@ +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2021 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Galley.Effects.BrigAccess + ( -- * Brig access effect + BrigAccess (..), + + -- * Connections + getConnectionsUnqualified, + getConnectionsUnqualifiedBidi, + getConnections, + putConnectionInternal, + + -- * Users + reauthUser, + lookupActivatedUsers, + getUser, + getUsers, + deleteUser, + getContactList, + getRichInfoMultiUser, + + -- * Teams + getSize, + + -- * Clients + lookupClients, + lookupClientsFull, + notifyClientsAboutLegalHoldRequest, + getLegalHoldAuthToken, + addLegalHoldClientToUser, + removeLegalHoldClientFromUser, + ) +where + +import Brig.Types.Client +import Brig.Types.Connection +import Brig.Types.Intra +import Brig.Types.User +import Data.Id +import Data.Misc +import Data.Qualified +import Galley.External.LegalHoldService.Types +import Imports +import Network.HTTP.Types.Status +import Polysemy +import Wire.API.Routes.Internal.Brig.Connection +import Wire.API.Team.Size +import Wire.API.User.Client +import Wire.API.User.RichInfo + +data BrigAccess m a where + GetConnectionsUnqualified :: + [UserId] -> + Maybe [UserId] -> + Maybe Relation -> + BrigAccess m [ConnectionStatus] + GetConnectionsUnqualifiedBidi :: + [UserId] -> + [UserId] -> + Maybe Relation -> + Maybe Relation -> + BrigAccess m ([ConnectionStatus], [ConnectionStatus]) + GetConnections :: + [UserId] -> + Maybe [Qualified UserId] -> + Maybe Relation -> + BrigAccess m [ConnectionStatusV2] + PutConnectionInternal :: UpdateConnectionsInternal -> BrigAccess m Status + ReauthUser :: UserId -> ReAuthUser -> BrigAccess m Bool + LookupActivatedUsers :: [UserId] -> BrigAccess m [User] + GetUsers :: [UserId] -> BrigAccess m [UserAccount] + DeleteUser :: UserId -> BrigAccess m () + GetContactList :: UserId -> BrigAccess m [UserId] + GetRichInfoMultiUser :: [UserId] -> BrigAccess m [(UserId, RichInfo)] + GetSize :: TeamId -> BrigAccess m TeamSize + LookupClients :: [UserId] -> BrigAccess m UserClients + LookupClientsFull :: [UserId] -> BrigAccess m UserClientsFull + NotifyClientsAboutLegalHoldRequest :: + UserId -> + UserId -> + LastPrekey -> + BrigAccess m () + GetLegalHoldAuthToken :: + UserId -> + Maybe PlainTextPassword -> + BrigAccess m OpaqueAuthToken + AddLegalHoldClientToUser :: + UserId -> + ConnId -> + [Prekey] -> + LastPrekey -> + BrigAccess m ClientId + RemoveLegalHoldClientFromUser :: UserId -> BrigAccess m () + +makeSem ''BrigAccess + +getUser :: Member BrigAccess r => UserId -> Sem r (Maybe UserAccount) +getUser = fmap listToMaybe . getUsers . pure diff --git a/services/galley/src/Galley/Effects/ExternalAccess.hs b/services/galley/src/Galley/Effects/ExternalAccess.hs new file mode 100644 index 0000000000..81889aed8a --- /dev/null +++ b/services/galley/src/Galley/Effects/ExternalAccess.hs @@ -0,0 +1,38 @@ +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2021 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Galley.Effects.ExternalAccess + ( -- * External access effect + ExternalAccess (..), + deliver, + deliverAsync, + deliverAndDeleteAsync, + ) +where + +import Data.Id +import Galley.Data.Services +import Imports +import Polysemy +import Wire.API.Event.Conversation + +data ExternalAccess m a where + Deliver :: Foldable f => f (BotMember, Event) -> ExternalAccess m [BotMember] + DeliverAsync :: Foldable f => f (BotMember, Event) -> ExternalAccess m () + DeliverAndDeleteAsync :: Foldable f => ConvId -> f (BotMember, Event) -> ExternalAccess m () + +makeSem ''ExternalAccess diff --git a/services/galley/src/Galley/Effects/FederatorAccess.hs b/services/galley/src/Galley/Effects/FederatorAccess.hs new file mode 100644 index 0000000000..9a31cd3e09 --- /dev/null +++ b/services/galley/src/Galley/Effects/FederatorAccess.hs @@ -0,0 +1,60 @@ +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2021 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Galley.Effects.FederatorAccess + ( -- * Federator access effect + FederatorAccess (..), + runFederated, + runFederatedEither, + runFederatedConcurrently, + runFederatedConcurrently_, + ) +where + +import Data.Qualified +import Galley.Intra.Federator.Types +import Imports +import Polysemy +import Wire.API.Federation.Client +import Wire.API.Federation.GRPC.Types + +data FederatorAccess m a where + RunFederated :: + forall (c :: Component) a m x. + Remote x -> + FederatedRPC c a -> + FederatorAccess m a + RunFederatedEither :: + forall (c :: Component) a m x. + Remote x -> + FederatedRPC c a -> + FederatorAccess m (Either FederationError a) + RunFederatedConcurrently :: + forall (c :: Component) f a m x. + (Foldable f, Functor f) => + f (Remote x) -> + (Remote [x] -> FederatedRPC c a) -> + FederatorAccess m [Remote a] + +makeSem ''FederatorAccess + +runFederatedConcurrently_ :: + (Foldable f, Functor f, Member FederatorAccess r) => + f (Remote a) -> + (Remote [a] -> FederatedRPC c ()) -> + Sem r () +runFederatedConcurrently_ xs = void . runFederatedConcurrently xs diff --git a/services/galley/src/Galley/Effects/FireAndForget.hs b/services/galley/src/Galley/Effects/FireAndForget.hs index 4b614862a3..73ff93d377 100644 --- a/services/galley/src/Galley/Effects/FireAndForget.hs +++ b/services/galley/src/Galley/Effects/FireAndForget.hs @@ -37,6 +37,10 @@ makeSem ''FireAndForget fireAndForget :: Member FireAndForget r => Sem r () -> Sem r () fireAndForget = fireAndForgetOne +-- | Run actions in separate threads and ignore results. +-- +-- /Note/: this will also ignore any state and error effects contained in the +-- 'FireAndForget' action. Use with care. interpretFireAndForget :: Member (Final IO) r => Sem (FireAndForget ': r) a -> Sem r a interpretFireAndForget = interpretFinal @IO $ \case FireAndForgetOne action -> do diff --git a/services/galley/src/Galley/Effects/GundeckAccess.hs b/services/galley/src/Galley/Effects/GundeckAccess.hs new file mode 100644 index 0000000000..1f035ff1a8 --- /dev/null +++ b/services/galley/src/Galley/Effects/GundeckAccess.hs @@ -0,0 +1,38 @@ +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2021 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Galley.Effects.GundeckAccess + ( -- * Gundeck access effect + GundeckAccess (..), + push, + push1, + ) +where + +import qualified Galley.Intra.Push as G +import Imports +import Polysemy + +data GundeckAccess m a where + Push :: Foldable f => f G.Push -> GundeckAccess m () + +makeSem ''GundeckAccess + +-- | Asynchronously send a single push, chunking it into multiple +-- requests if there are more than 128 recipients. +push1 :: Member GundeckAccess r => G.Push -> Sem r () +push1 x = push [x] diff --git a/services/galley/src/Galley/Effects/SparAccess.hs b/services/galley/src/Galley/Effects/SparAccess.hs new file mode 100644 index 0000000000..b8479858aa --- /dev/null +++ b/services/galley/src/Galley/Effects/SparAccess.hs @@ -0,0 +1,26 @@ +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2021 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Galley.Effects.SparAccess where + +import Data.Id +import Polysemy + +data SparAccess m a where + DeleteTeam :: TeamId -> SparAccess m () + +makeSem ''SparAccess diff --git a/services/galley/src/Galley/Env.hs b/services/galley/src/Galley/Env.hs index 3be666ff2d..bff564836e 100644 --- a/services/galley/src/Galley/Env.hs +++ b/services/galley/src/Galley/Env.hs @@ -18,16 +18,23 @@ module Galley.Env where import Cassandra -import Control.Lens +import Control.Lens hiding ((.=)) +import Data.ByteString.Conversion (toByteString') import Data.Id import Data.Metrics.Middleware import Data.Misc (Fingerprint, Rsa) +import Data.Range import qualified Galley.Aws as Aws import Galley.Options import qualified Galley.Queue as Q +import qualified Galley.Types.Teams as Teams import Imports import Network.HTTP.Client +import Network.HTTP.Client.OpenSSL +import OpenSSL.EVP.Digest import OpenSSL.Session as Ssl +import qualified OpenSSL.X509.SystemStore as Ssl +import Ssl.Util import System.Logger import Util.Options @@ -58,3 +65,36 @@ data ExtEnv = ExtEnv makeLenses ''Env makeLenses ''ExtEnv + +-- TODO: somewhat duplicates Brig.App.initExtGetManager +initExtEnv :: IO ExtEnv +initExtEnv = do + ctx <- Ssl.context + Ssl.contextSetVerificationMode ctx Ssl.VerifyNone + Ssl.contextAddOption ctx SSL_OP_NO_SSLv2 + Ssl.contextAddOption ctx SSL_OP_NO_SSLv3 + Ssl.contextAddOption ctx SSL_OP_NO_TLSv1 + Ssl.contextSetCiphers ctx rsaCiphers + Ssl.contextLoadSystemCerts ctx + mgr <- + newManager + (opensslManagerSettings (pure ctx)) + { managerResponseTimeout = responseTimeoutMicro 10000000, + managerConnCount = 100 + } + Just sha <- getDigestByName "SHA256" + return $ ExtEnv (mgr, mkVerify sha) + where + mkVerify sha fprs = + let pinset = map toByteString' fprs + in verifyRsaFingerprint sha pinset + +reqIdMsg :: RequestId -> Msg -> Msg +reqIdMsg = ("request" .=) . unRequestId +{-# INLINE reqIdMsg #-} + +currentFanoutLimit :: Opts -> Range 1 Teams.HardTruncationLimit Int32 +currentFanoutLimit o = do + let optFanoutLimit = fromIntegral . fromRange $ fromMaybe defFanoutLimit (o ^. optSettings ^. setMaxFanoutSize) + let maxTeamSize = fromIntegral (o ^. optSettings ^. setMaxTeamSize) + unsafeRange (min maxTeamSize optFanoutLimit) diff --git a/services/galley/src/Galley/External.hs b/services/galley/src/Galley/External.hs index 63e1cad55b..e325f3da58 100644 --- a/services/galley/src/Galley/External.hs +++ b/services/galley/src/Galley/External.hs @@ -15,12 +15,7 @@ -- You should have received a copy of the GNU Affero General Public License along -- with this program. If not, see . -module Galley.External - ( deliver, - deliverAndDeleteAsync, - deliverAsync, - ) -where +module Galley.External (interpretExternalAccess) where import Bilge.Request import Bilge.Retry (httpHandlers) @@ -29,58 +24,59 @@ import Control.Retry import Data.ByteString.Conversion.To import Data.Id import Data.Misc -import Galley.App import Galley.Cassandra.Services import Galley.Data.Services (BotMember, botMemId, botMemService) import Galley.Effects +import Galley.Effects.ExternalAccess (ExternalAccess (..)) +import Galley.Env import Galley.Intra.User +import Galley.Intra.Util import Galley.Types (Event) import Galley.Types.Bot import Imports import qualified Network.HTTP.Client as Http import Network.HTTP.Types.Method import Network.HTTP.Types.Status (status410) +import Polysemy +import qualified Polysemy.Reader as P import Ssl.Util (withVerifiedSslConnection) import qualified System.Logger.Class as Log import System.Logger.Message (field, msg, val, (~~)) import URI.ByteString import UnliftIO (Async, async, waitCatch) +interpretExternalAccess :: + Members '[Embed IO, P.Reader Env] r => + Sem (ExternalAccess ': r) a -> + Sem r a +interpretExternalAccess = interpret $ \case + Deliver pp -> embedIntra $ deliver (toList pp) + DeliverAsync pp -> embedIntra $ deliverAsync (toList pp) + DeliverAndDeleteAsync cid pp -> embedIntra $ deliverAndDeleteAsync cid (toList pp) + -- | Like deliver, but ignore orphaned bots and return immediately. -- -- FUTUREWORK: Check if this can be removed. -deliverAsync :: Member ExternalAccess r => [(BotMember, Event)] -> Galley r () -deliverAsync = liftGalley0 . void . forkIO . void . deliver0 +deliverAsync :: [(BotMember, Event)] -> IntraM () +deliverAsync = void . forkIO . void . deliver -- | Like deliver, but remove orphaned bots and return immediately. -deliverAndDeleteAsync :: - Members '[ExternalAccess, BotAccess] r => - ConvId -> - [(BotMember, Event)] -> - Galley r () -deliverAndDeleteAsync cnv pushes = liftGalley0 . void . forkIO $ do - gone <- liftGalley0 $ deliver0 pushes - mapM_ (deleteBot0 cnv . botMemId) gone - --- | Deliver events to external (bot) services. --- --- Returns those bots which are found to be orphaned by the external --- service, e.g. when the service tells us that it no longer knows about the --- bot. -deliver :: Member ExternalAccess r => [(BotMember, Event)] -> Galley r [BotMember] -deliver = liftGalley0 . deliver0 +deliverAndDeleteAsync :: ConvId -> [(BotMember, Event)] -> IntraM () +deliverAndDeleteAsync cnv pushes = void . forkIO $ do + gone <- deliver pushes + mapM_ (deleteBot cnv . botMemId) gone -deliver0 :: [(BotMember, Event)] -> Galley0 [BotMember] -deliver0 pp = mapM (async . exec) pp >>= foldM eval [] . zip (map fst pp) +deliver :: [(BotMember, Event)] -> IntraM [BotMember] +deliver pp = mapM (async . exec) pp >>= foldM eval [] . zip (map fst pp) where - exec :: (BotMember, Event) -> Galley0 Bool + exec :: (BotMember, Event) -> IntraM Bool exec (b, e) = lookupService (botMemService b) >>= \case Nothing -> return False Just s -> do deliver1 s b e return True - eval :: [BotMember] -> (BotMember, Async Bool) -> Galley r [BotMember] + eval :: [BotMember] -> (BotMember, Async Bool) -> IntraM [BotMember] eval gone (b, a) = do let s = botMemService b r <- waitCatch a @@ -119,7 +115,7 @@ deliver0 pp = mapM (async . exec) pp >>= foldM eval [] . zip (map fst pp) -- Internal ------------------------------------------------------------------- -deliver1 :: Service -> BotMember -> Event -> Galley0 () +deliver1 :: Service -> BotMember -> Event -> IntraM () deliver1 s bm e | s ^. serviceEnabled = do let t = toByteString' (s ^. serviceToken) @@ -149,7 +145,7 @@ urlPort (HttpsUrl u) = do p <- a ^. authorityPortL return (fromIntegral (p ^. portNumberL)) -sendMessage :: [Fingerprint Rsa] -> (Request -> Request) -> Galley r () +sendMessage :: [Fingerprint Rsa] -> (Request -> Request) -> IntraM () sendMessage fprs reqBuilder = do (man, verifyFingerprints) <- view (extEnv . extGetManager) liftIO . withVerifiedSslConnection (verifyFingerprints fprs) man reqBuilder $ \req -> diff --git a/services/galley/src/Galley/External/LegalHoldService.hs b/services/galley/src/Galley/External/LegalHoldService.hs index 133b4cf413..ac503fc67c 100644 --- a/services/galley/src/Galley/External/LegalHoldService.hs +++ b/services/galley/src/Galley/External/LegalHoldService.hs @@ -1,5 +1,3 @@ -{-# LANGUAGE GeneralizedNewtypeDeriving #-} - -- This file is part of the Wire Server implementation. -- -- Copyright (C) 2020 Wire Swiss GmbH @@ -26,9 +24,6 @@ module Galley.External.LegalHoldService -- * helpers validateServiceKey, - - -- * types - OpaqueAuthToken (..), ) where @@ -50,6 +45,8 @@ import Data.Misc import Galley.API.Error import Galley.App import qualified Galley.Data.LegalHold as LegalHoldData +import Galley.Env +import Galley.External.LegalHoldService.Types import Imports import qualified Network.HTTP.Client as Http import Network.HTTP.Types @@ -237,14 +234,3 @@ validateServiceKey pem = (SSL.readPublicKey (LC8.unpack (toByteString pem)) >>= return . Just) minRsaKeySize :: Int minRsaKeySize = 256 -- Bytes (= 2048 bits) - --- Types - --- | When receiving tokens from other services which are 'just passing through' --- it's error-prone useless extra work to parse and render them from JSON over and over again. --- We'll just wrap them with this to give some level of typesafety and a reasonable JSON --- instance -newtype OpaqueAuthToken = OpaqueAuthToken - { opaqueAuthTokenToText :: Text - } - deriving newtype (Eq, Show, FromJSON, ToJSON, ToByteString) diff --git a/services/galley/src/Galley/External/LegalHoldService/Types.hs b/services/galley/src/Galley/External/LegalHoldService/Types.hs new file mode 100644 index 0000000000..cecf37ad87 --- /dev/null +++ b/services/galley/src/Galley/External/LegalHoldService/Types.hs @@ -0,0 +1,36 @@ +{-# LANGUAGE GeneralizedNewtypeDeriving #-} + +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2020 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Galley.External.LegalHoldService.Types + ( OpaqueAuthToken (..), + ) +where + +import Data.Aeson +import Data.ByteString.Conversion.To +import Imports + +-- | When receiving tokens from other services which are 'just passing through' +-- it's error-prone useless extra work to parse and render them from JSON over and over again. +-- We'll just wrap them with this to give some level of typesafety and a reasonable JSON +-- instance +newtype OpaqueAuthToken = OpaqueAuthToken + { opaqueAuthTokenToText :: Text + } + deriving newtype (Eq, Show, FromJSON, ToJSON, ToByteString) diff --git a/services/galley/src/Galley/Intra/Client.hs b/services/galley/src/Galley/Intra/Client.hs index f0a941d0a3..52a783513c 100644 --- a/services/galley/src/Galley/Intra/Client.hs +++ b/services/galley/src/Galley/Intra/Client.hs @@ -38,24 +38,26 @@ import Data.Misc import qualified Data.Set as Set import Data.Text.Encoding import Galley.API.Error -import Galley.App import Galley.Effects -import Galley.External.LegalHoldService +import Galley.Env +import Galley.External.LegalHoldService.Types import Galley.Intra.Util import Imports import Network.HTTP.Types.Method import Network.HTTP.Types.Status import Network.Wai.Utilities.Error +import Polysemy +import qualified Polysemy.Reader as P +import qualified Polysemy.TinyLog as P import qualified System.Logger.Class as Logger import Wire.API.User.Client (UserClients, UserClientsFull, filterClients, filterClientsFull) -- | Calls 'Brig.API.internalListClientsH'. -lookupClients :: Member BrigAccess r => [UserId] -> Galley r UserClients +lookupClients :: [UserId] -> IntraM UserClients lookupClients uids = do - (brigHost, brigPort) <- brigReq r <- - callBrig $ - method POST . host brigHost . port brigPort + call Brig $ + method POST . path "/i/clients" . json (UserSet $ Set.fromList uids) . expect2xx @@ -64,14 +66,12 @@ lookupClients uids = do -- | Calls 'Brig.API.internalListClientsFullH'. lookupClientsFull :: - Member BrigAccess r => [UserId] -> - Galley r UserClientsFull + IntraM UserClientsFull lookupClientsFull uids = do - (brigHost, brigPort) <- brigReq r <- - callBrig $ - method POST . host brigHost . port brigPort + call Brig $ + method POST . path "/i/clients/full" . json (UserSet $ Set.fromList uids) . expect2xx @@ -80,52 +80,44 @@ lookupClientsFull uids = do -- | Calls 'Brig.API.legalHoldClientRequestedH'. notifyClientsAboutLegalHoldRequest :: - Member BrigAccess r => UserId -> UserId -> LastPrekey -> - Galley r () + IntraM () notifyClientsAboutLegalHoldRequest requesterUid targetUid lastPrekey' = do - (brigHost, brigPort) <- brigReq - void . callBrig $ + void . call Brig $ method POST - . host brigHost - . port brigPort . paths ["i", "clients", "legalhold", toByteString' targetUid, "request"] . json (LegalHoldClientRequest requesterUid lastPrekey') . expect2xx -- | Calls 'Brig.User.API.Auth.legalHoldLoginH'. getLegalHoldAuthToken :: - Member BrigAccess r => + Members '[Embed IO, P.TinyLog, P.Reader Env] r => UserId -> Maybe PlainTextPassword -> - Galley r OpaqueAuthToken + Sem r OpaqueAuthToken getLegalHoldAuthToken uid pw = do - (brigHost, brigPort) <- brigReq r <- - callBrig $ + embedIntra . call Brig $ method POST - . host brigHost - . port brigPort . path "/i/legalhold-login" . queryItem "persist" "true" . json (LegalHoldLogin uid pw Nothing) . expect2xx case getCookieValue "zuid" r of Nothing -> do - Logger.warn $ Logger.msg @Text "Response from login missing auth cookie" - throwM internalError + P.warn $ Logger.msg @Text "Response from login missing auth cookie" + embed $ throwM internalError Just c -> pure . OpaqueAuthToken . decodeUtf8 $ c -- | Calls 'Brig.API.addClientInternalH'. addLegalHoldClientToUser :: - Member BrigAccess r => UserId -> ConnId -> [Prekey] -> LastPrekey -> - Galley r ClientId + IntraM ClientId addLegalHoldClientToUser uid connId prekeys lastPrekey' = do clientId <$> brigAddClient uid connId lhClient where @@ -143,28 +135,21 @@ addLegalHoldClientToUser uid connId prekeys lastPrekey' = do -- | Calls 'Brig.API.removeLegalHoldClientH'. removeLegalHoldClientFromUser :: - Member BrigAccess r => UserId -> - Galley r () + IntraM () removeLegalHoldClientFromUser targetUid = do - (brigHost, brigPort) <- brigReq - void . callBrig $ + void . call Brig $ method DELETE - . host brigHost - . port brigPort . paths ["i", "clients", "legalhold", toByteString' targetUid] . contentJson . expect2xx -- | Calls 'Brig.API.addClientInternalH'. -brigAddClient :: Member BrigAccess r => UserId -> ConnId -> NewClient -> Galley r Client +brigAddClient :: UserId -> ConnId -> NewClient -> IntraM Client brigAddClient uid connId client = do - (brigHost, brigPort) <- brigReq r <- - callBrig $ + call Brig $ method POST - . host brigHost - . port brigPort . header "Z-Connection" (toByteString' connId) . paths ["i", "clients", toByteString' uid] . contentJson diff --git a/services/galley/src/Galley/Intra/Effects.hs b/services/galley/src/Galley/Intra/Effects.hs new file mode 100644 index 0000000000..26191832bf --- /dev/null +++ b/services/galley/src/Galley/Intra/Effects.hs @@ -0,0 +1,95 @@ +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2021 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Galley.Intra.Effects + ( interpretBrigAccess, + interpretSparAccess, + interpretBotAccess, + interpretGundeckAccess, + ) +where + +import Galley.Effects.BotAccess (BotAccess (..)) +import Galley.Effects.BrigAccess (BrigAccess (..)) +import Galley.Effects.GundeckAccess (GundeckAccess (..)) +import Galley.Effects.SparAccess (SparAccess (..)) +import Galley.Env +import Galley.Intra.Client +import qualified Galley.Intra.Push.Internal as G +import Galley.Intra.Spar +import Galley.Intra.Team +import Galley.Intra.User +import Galley.Intra.Util +import Imports +import Polysemy +import qualified Polysemy.Reader as P +import qualified Polysemy.TinyLog as P +import qualified UnliftIO + +interpretBrigAccess :: + Members '[Embed IO, P.TinyLog, P.Reader Env] r => + Sem (BrigAccess ': r) a -> + Sem r a +interpretBrigAccess = interpret $ \case + GetConnectionsUnqualified uids muids mrel -> + embedIntra $ getConnectionsUnqualified uids muids mrel + GetConnectionsUnqualifiedBidi uids1 uids2 mrel1 mrel2 -> + embedIntra $ + UnliftIO.concurrently + (getConnectionsUnqualified uids1 (Just uids2) mrel1) + (getConnectionsUnqualified uids2 (Just uids1) mrel2) + GetConnections uids mquids mrel -> + embedIntra $ + getConnections uids mquids mrel + PutConnectionInternal uc -> embedIntra $ putConnectionInternal uc + ReauthUser uid reauth -> embedIntra $ reAuthUser uid reauth + LookupActivatedUsers uids -> embedIntra $ lookupActivatedUsers uids + GetUsers uids -> embedIntra $ getUsers uids + DeleteUser uid -> embedIntra $ deleteUser uid + GetContactList uid -> embedIntra $ getContactList uid + GetRichInfoMultiUser uids -> embedIntra $ getRichInfoMultiUser uids + GetSize tid -> embedIntra $ getSize tid + LookupClients uids -> embedIntra $ lookupClients uids + LookupClientsFull uids -> embedIntra $ lookupClientsFull uids + NotifyClientsAboutLegalHoldRequest self other pk -> + embedIntra $ notifyClientsAboutLegalHoldRequest self other pk + GetLegalHoldAuthToken uid mpwd -> getLegalHoldAuthToken uid mpwd + AddLegalHoldClientToUser uid conn pks lpk -> + embedIntra $ addLegalHoldClientToUser uid conn pks lpk + RemoveLegalHoldClientFromUser uid -> + embedIntra $ removeLegalHoldClientFromUser uid + +interpretSparAccess :: + Members '[Embed IO, P.Reader Env] r => + Sem (SparAccess ': r) a -> + Sem r a +interpretSparAccess = interpret $ \case + DeleteTeam tid -> embedIntra $ deleteTeam tid + +interpretBotAccess :: + Members '[Embed IO, P.Reader Env] r => + Sem (BotAccess ': r) a -> + Sem r a +interpretBotAccess = interpret $ \case + DeleteBot cid bid -> embedIntra $ deleteBot cid bid + +interpretGundeckAccess :: + Members '[Embed IO, P.TinyLog, P.Reader Env] r => + Sem (GundeckAccess ': r) a -> + Sem r a +interpretGundeckAccess = interpret $ \case + Push ps -> embedIntra $ G.push ps diff --git a/services/galley/src/Galley/Intra/Federator.hs b/services/galley/src/Galley/Intra/Federator.hs new file mode 100644 index 0000000000..cd08fb3257 --- /dev/null +++ b/services/galley/src/Galley/Intra/Federator.hs @@ -0,0 +1,74 @@ +{-# LANGUAGE GeneralizedNewtypeDeriving #-} + +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2020 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Galley.Intra.Federator (interpretFederatorAccess) where + +import Control.Monad.Except +import Data.Qualified +import Galley.Effects.FederatorAccess (FederatorAccess (..)) +import Galley.Env +import Galley.Intra.Federator.Types +import Imports +import Polysemy +import qualified Polysemy.Reader as P +import UnliftIO +import Wire.API.Federation.Client +import Wire.API.Federation.Error + +embedFederationM :: + Members '[Embed IO, P.Reader Env] r => + FederationM a -> + Sem r a +embedFederationM action = do + env <- P.ask + embed $ runFederationM env action + +interpretFederatorAccess :: + Members '[Embed IO, P.Reader Env] r => + Sem (FederatorAccess ': r) a -> + Sem r a +interpretFederatorAccess = interpret $ \case + RunFederated dom rpc -> embedFederationM $ runFederated dom rpc + RunFederatedEither dom rpc -> embedFederationM $ runFederatedEither dom rpc + RunFederatedConcurrently rs f -> embedFederationM $ runFederatedConcurrently rs f + +runFederatedEither :: + Remote x -> + FederatedRPC c a -> + FederationM (Either FederationError a) +runFederatedEither (tDomain -> remoteDomain) rpc = do + env <- ask + liftIO $ runFederationM env (runExceptT (executeFederated remoteDomain rpc)) + +runFederated :: + Remote x -> + FederatedRPC c a -> + FederationM a +runFederated dom rpc = + runFederatedEither dom rpc + >>= either (throwIO . federationErrorToWai) pure + +runFederatedConcurrently :: + (Foldable f, Functor f) => + f (Remote a) -> + (Remote [a] -> FederatedRPC c b) -> + FederationM [Remote b] +runFederatedConcurrently xs rpc = + pooledForConcurrentlyN 8 (bucketRemote xs) $ \r -> + qualifyAs r <$> runFederated r (rpc r) diff --git a/services/galley/src/Galley/Intra/Federator/Types.hs b/services/galley/src/Galley/Intra/Federator/Types.hs new file mode 100644 index 0000000000..44f43d5321 --- /dev/null +++ b/services/galley/src/Galley/Intra/Federator/Types.hs @@ -0,0 +1,54 @@ +{-# LANGUAGE GeneralizedNewtypeDeriving #-} + +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2020 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Galley.Intra.Federator.Types + ( FederatedRPC, + FederationM, + runFederationM, + ) +where + +import Control.Lens +import Control.Monad.Except +import Galley.Env +import Galley.Options +import Imports +import Wire.API.Federation.Client +import Wire.API.Federation.GRPC.Types + +type FederatedRPC (c :: Component) = + FederatorClient c (ExceptT FederationClientFailure FederationM) + +newtype FederationM a = FederationM + {unFederationM :: ReaderT Env IO a} + deriving + ( Functor, + Applicative, + Monad, + MonadIO, + MonadReader Env, + MonadUnliftIO + ) + +runFederationM :: Env -> FederationM a -> IO a +runFederationM env = flip runReaderT env . unFederationM + +instance HasFederatorConfig FederationM where + federatorEndpoint = view federator + federationDomain = view (options . optSettings . setFederationDomain) diff --git a/services/galley/src/Galley/Intra/Push.hs b/services/galley/src/Galley/Intra/Push.hs index 15f6707614..71292655ac 100644 --- a/services/galley/src/Galley/Intra/Push.hs +++ b/services/galley/src/Galley/Intra/Push.hs @@ -1,5 +1,3 @@ -{-# LANGUAGE StrictData #-} - -- This file is part of the Wire Server implementation. -- -- Copyright (C) 2020 Wire Swiss GmbH @@ -25,9 +23,6 @@ module Galley.Intra.Push newConversationEventPush, newPush1, newPushLocal1, - push, - push1, - pushSome, PushEvent (..), -- * Push Configuration @@ -51,211 +46,5 @@ module Galley.Intra.Push ) where -import Bilge hiding (options) -import Bilge.RPC -import Bilge.Retry -import Control.Lens (makeLenses, set, view, (.~), (^.)) -import Control.Monad.Catch -import Control.Retry -import Data.Aeson (Object) -import Data.Domain -import Data.Id (ConnId, UserId) -import Data.Json.Util -import Data.List.Extra (chunksOf) -import Data.List.NonEmpty (nonEmpty) -import Data.List1 -import Data.Misc -import Data.Qualified -import Data.Range -import qualified Data.Set as Set -import Data.Text.Encoding (encodeUtf8) -import qualified Data.Text.Lazy as LT -import Galley.App -import Galley.Effects -import Galley.Options -import Galley.Types -import qualified Galley.Types.Teams as Teams -import Gundeck.Types.Push.V2 (RecipientClients (..)) +import Galley.Intra.Push.Internal import qualified Gundeck.Types.Push.V2 as Gundeck -import Imports hiding (forkIO) -import Network.HTTP.Types.Method -import Safe (headDef, tailDef) -import System.Logger.Class hiding (new) -import UnliftIO.Async (mapConcurrently) -import UnliftIO.Concurrent (forkIO) -import Util.Options -import qualified Wire.API.Event.FeatureConfig as FeatureConfig - -data PushEvent - = ConvEvent Event - | TeamEvent Teams.Event - | FeatureConfigEvent FeatureConfig.Event - -pushEventJson :: PushEvent -> Object -pushEventJson (ConvEvent e) = toJSONObject e -pushEventJson (TeamEvent e) = toJSONObject e -pushEventJson (FeatureConfigEvent e) = toJSONObject e - -type Recipient = RecipientBy UserId - -data RecipientBy user = Recipient - { _recipientUserId :: user, - _recipientClients :: RecipientClients - } - deriving stock (Functor, Foldable, Traversable) - -makeLenses ''RecipientBy - -recipient :: LocalMember -> Recipient -recipient = userRecipient . lmId - -userRecipient :: user -> RecipientBy user -userRecipient u = Recipient u RecipientClientsAll - -type Push = PushTo UserId - -data PushTo user = Push - { _pushConn :: Maybe ConnId, - _pushTransient :: Bool, - _pushRoute :: Gundeck.Route, - _pushNativePriority :: Maybe Gundeck.Priority, - _pushAsync :: Bool, - pushOrigin :: Maybe UserId, - _pushRecipients :: List1 (RecipientBy user), - pushJson :: Object, - pushRecipientListType :: Teams.ListType - } - deriving stock (Functor, Foldable, Traversable) - -makeLenses ''PushTo - -newPush1 :: Teams.ListType -> Maybe UserId -> PushEvent -> List1 Recipient -> Push -newPush1 recipientListType from e rr = - Push - { _pushConn = Nothing, - _pushTransient = False, - _pushRoute = Gundeck.RouteAny, - _pushNativePriority = Nothing, - _pushAsync = False, - pushRecipientListType = recipientListType, - pushJson = pushEventJson e, - pushOrigin = from, - _pushRecipients = rr - } - -newPushLocal1 :: Teams.ListType -> UserId -> PushEvent -> List1 Recipient -> Push -newPushLocal1 lt uid e rr = newPush1 lt (Just uid) e rr - -newPush :: Teams.ListType -> Maybe UserId -> PushEvent -> [Recipient] -> Maybe Push -newPush _ _ _ [] = Nothing -newPush t u e (r : rr) = Just $ newPush1 t u e (list1 r rr) - -newPushLocal :: Teams.ListType -> UserId -> PushEvent -> [Recipient] -> Maybe Push -newPushLocal lt uid e rr = newPush lt (Just uid) e rr - -newConversationEventPush :: Domain -> Event -> [UserId] -> Maybe Push -newConversationEventPush localDomain e users = - let musr = guard (localDomain == qDomain (evtFrom e)) $> qUnqualified (evtFrom e) - in newPush Teams.ListComplete musr (ConvEvent e) (map userRecipient users) - --- | Asynchronously send a single push, chunking it into multiple --- requests if there are more than 128 recipients. -push1 :: Member GundeckAccess r => Push -> Galley r () -push1 p = push (list1 p []) - -pushSome :: Member GundeckAccess r => [Push] -> Galley r () -pushSome [] = return () -pushSome (x : xs) = push (list1 x xs) - -push :: Member GundeckAccess r => List1 Push -> Galley r () -push ps = do - let (localPushes, remotePushes) = foldMap (bimap toList toList . splitPush) (toList ps) - traverse_ (pushLocal . List1) (nonEmpty localPushes) - traverse_ (pushRemote . List1) (nonEmpty remotePushes) - where - splitPush :: Push -> (Maybe (PushTo UserId), Maybe (PushTo UserId)) - splitPush p = - (mkPushTo localRecipients p, mkPushTo remoteRecipients p) - where - localRecipients = toList $ _pushRecipients p - remoteRecipients = [] -- FUTUREWORK: deal with remote sending - mkPushTo :: [RecipientBy a] -> PushTo b -> Maybe (PushTo a) - mkPushTo recipients p = - nonEmpty recipients <&> \nonEmptyRecipients -> - p {_pushRecipients = List1 nonEmptyRecipients} - --- | Asynchronously send multiple pushes, aggregating them into as --- few requests as possible, such that no single request targets --- more than 128 recipients. -pushLocal :: Member GundeckAccess r => List1 (PushTo UserId) -> Galley r () -pushLocal ps = do - limit <- fanoutLimit - opts <- view options - -- Do not fan out for very large teams - let (asyncs, sync) = partition _pushAsync (removeIfLargeFanout limit $ toList ps) - forM_ (pushes asyncs) $ callAsync "gundeck" . gundeckReq opts - void . liftGalley0 $ mapConcurrently (call0 "gundeck" . gundeckReq opts) (pushes sync) - return () - where - pushes = fst . foldr chunk ([], 0) - chunk p (pss, !n) = - let r = recipientList p - nr = length r - in if n + nr > maxRecipients - then - let pss' = map (pure . toPush p) (chunksOf maxRecipients r) - in (pss' ++ pss, 0) - else - let hd = headDef [] pss - tl = tailDef [] pss - in ((toPush p r : hd) : tl, n + nr) - maxRecipients = 128 - recipientList p = map (toRecipient p) . toList $ _pushRecipients p - toPush p r = - let pload = Gundeck.singletonPayload (pushJson p) - in Gundeck.newPush (pushOrigin p) (unsafeRange (Set.fromList r)) pload - & Gundeck.pushOriginConnection .~ _pushConn p - & Gundeck.pushTransient .~ _pushTransient p - & maybe id (set Gundeck.pushNativePriority) (_pushNativePriority p) - toRecipient p r = - Gundeck.recipient (_recipientUserId r) (_pushRoute p) - & Gundeck.recipientClients .~ _recipientClients r - -- Ensure that under no circumstances we exceed the threshold - removeIfLargeFanout limit = - filter - ( \p -> - (pushRecipientListType p == Teams.ListComplete) - && (length (_pushRecipients p) <= (fromIntegral $ fromRange limit)) - ) - --- instead of IdMapping, we could also just take qualified IDs -pushRemote :: List1 (PushTo UserId) -> Galley r () -pushRemote _ps = do - -- FUTUREWORK(federation, #1261): send these to the other backends - pure () - ------------------------------------------------------------------------------ --- Helpers - -gundeckReq :: Opts -> [Gundeck.Push] -> Request -> Request -gundeckReq o ps = - host (encodeUtf8 $ o ^. optGundeck . epHost) - . port (portNumber $ fromIntegral (o ^. optGundeck . epPort)) - . method POST - . path "/i/push/v2" - . json ps - . expect2xx - -callAsync :: Member GundeckAccess r => LT.Text -> (Request -> Request) -> Galley r () -callAsync n r = liftGalley0 . void . forkIO $ void (call0 n r) `catches` handlers - where - handlers = - [ Handler $ \(x :: RPCException) -> err (rpcExceptionMsg x), - Handler $ \(x :: SomeException) -> err $ "remote" .= n ~~ msg (show x) - ] - -call0 :: LT.Text -> (Request -> Request) -> Galley0 (Response (Maybe LByteString)) -call0 n r = recovering x3 rpcHandlers (const (rpc n r)) - -x3 :: RetryPolicy -x3 = limitRetries 3 <> exponentialBackoff 100000 diff --git a/services/galley/src/Galley/Intra/Push/Internal.hs b/services/galley/src/Galley/Intra/Push/Internal.hs new file mode 100644 index 0000000000..6c4c7aefbc --- /dev/null +++ b/services/galley/src/Galley/Intra/Push/Internal.hs @@ -0,0 +1,168 @@ +{-# LANGUAGE StrictData #-} + +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2021 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Galley.Intra.Push.Internal where + +import Bilge hiding (options) +import Control.Lens (makeLenses, set, view, (.~)) +import Data.Aeson (Object) +import Data.Domain +import Data.Id (ConnId, UserId) +import Data.Json.Util +import Data.List.Extra (chunksOf) +import Data.List.NonEmpty (NonEmpty, nonEmpty) +import Data.List1 +import Data.Qualified +import Data.Range +import qualified Data.Set as Set +import Galley.Env +import Galley.Intra.Util +import Galley.Types +import qualified Galley.Types.Teams as Teams +import Gundeck.Types.Push.V2 (RecipientClients (..)) +import qualified Gundeck.Types.Push.V2 as Gundeck +import Imports hiding (forkIO) +import Safe (headDef, tailDef) +import UnliftIO.Async (mapConcurrently) +import qualified Wire.API.Event.FeatureConfig as FeatureConfig + +data PushEvent + = ConvEvent Event + | TeamEvent Teams.Event + | FeatureConfigEvent FeatureConfig.Event + +pushEventJson :: PushEvent -> Object +pushEventJson (ConvEvent e) = toJSONObject e +pushEventJson (TeamEvent e) = toJSONObject e +pushEventJson (FeatureConfigEvent e) = toJSONObject e + +data RecipientBy user = Recipient + { _recipientUserId :: user, + _recipientClients :: RecipientClients + } + deriving stock (Functor, Foldable, Traversable) + +makeLenses ''RecipientBy + +type Recipient = RecipientBy UserId + +data PushTo user = Push + { _pushConn :: Maybe ConnId, + _pushTransient :: Bool, + _pushRoute :: Gundeck.Route, + _pushNativePriority :: Maybe Gundeck.Priority, + _pushAsync :: Bool, + pushOrigin :: Maybe UserId, + _pushRecipients :: List1 (RecipientBy user), + pushJson :: Object, + pushRecipientListType :: Teams.ListType + } + deriving stock (Functor, Foldable, Traversable) + +makeLenses ''PushTo + +type Push = PushTo UserId + +push :: Foldable f => f Push -> IntraM () +push ps = do + let pushes = foldMap (toList . mkPushTo) ps + traverse_ pushLocal (nonEmpty pushes) + where + mkPushTo :: PushTo a -> Maybe (PushTo a) + mkPushTo p = + nonEmpty (toList (_pushRecipients p)) <&> \nonEmptyRecipients -> + p {_pushRecipients = List1 nonEmptyRecipients} + +-- | Asynchronously send multiple pushes, aggregating them into as +-- few requests as possible, such that no single request targets +-- more than 128 recipients. +pushLocal :: NonEmpty (PushTo UserId) -> IntraM () +pushLocal ps = do + opts <- view options + let limit = currentFanoutLimit opts + -- Do not fan out for very large teams + let (asyncs, syncs) = partition _pushAsync (removeIfLargeFanout limit $ toList ps) + traverse_ (asyncCall Gundeck . json) (pushes asyncs) + void $ mapConcurrently (call Gundeck . json) (pushes syncs) + where + pushes = fst . foldr chunk ([], 0) + chunk p (pss, !n) = + let r = recipientList p + nr = length r + in if n + nr > maxRecipients + then + let pss' = map (pure . toPush p) (chunksOf maxRecipients r) + in (pss' ++ pss, 0) + else + let hd = headDef [] pss + tl = tailDef [] pss + in ((toPush p r : hd) : tl, n + nr) + maxRecipients = 128 + recipientList p = map (toRecipient p) . toList $ _pushRecipients p + toPush p r = + let pload = Gundeck.singletonPayload (pushJson p) + in Gundeck.newPush (pushOrigin p) (unsafeRange (Set.fromList r)) pload + & Gundeck.pushOriginConnection .~ _pushConn p + & Gundeck.pushTransient .~ _pushTransient p + & maybe id (set Gundeck.pushNativePriority) (_pushNativePriority p) + toRecipient p r = + Gundeck.recipient (_recipientUserId r) (_pushRoute p) + & Gundeck.recipientClients .~ _recipientClients r + -- Ensure that under no circumstances we exceed the threshold + removeIfLargeFanout limit = + filter + ( \p -> + (pushRecipientListType p == Teams.ListComplete) + && (length (_pushRecipients p) <= (fromIntegral $ fromRange limit)) + ) + +recipient :: LocalMember -> Recipient +recipient = userRecipient . lmId + +userRecipient :: user -> RecipientBy user +userRecipient u = Recipient u RecipientClientsAll + +newPush1 :: Teams.ListType -> Maybe UserId -> PushEvent -> List1 Recipient -> Push +newPush1 recipientListType from e rr = + Push + { _pushConn = Nothing, + _pushTransient = False, + _pushRoute = Gundeck.RouteAny, + _pushNativePriority = Nothing, + _pushAsync = False, + pushRecipientListType = recipientListType, + pushJson = pushEventJson e, + pushOrigin = from, + _pushRecipients = rr + } + +newPushLocal1 :: Teams.ListType -> UserId -> PushEvent -> List1 Recipient -> Push +newPushLocal1 lt uid e rr = newPush1 lt (Just uid) e rr + +newPush :: Teams.ListType -> Maybe UserId -> PushEvent -> [Recipient] -> Maybe Push +newPush _ _ _ [] = Nothing +newPush t u e (r : rr) = Just $ newPush1 t u e (list1 r rr) + +newPushLocal :: Teams.ListType -> UserId -> PushEvent -> [Recipient] -> Maybe Push +newPushLocal lt uid e rr = newPush lt (Just uid) e rr + +newConversationEventPush :: Domain -> Event -> [UserId] -> Maybe Push +newConversationEventPush localDomain e users = + let musr = guard (localDomain == qDomain (evtFrom e)) $> qUnqualified (evtFrom e) + in newPush Teams.ListComplete musr (ConvEvent e) (map userRecipient users) diff --git a/services/galley/src/Galley/Intra/Spar.hs b/services/galley/src/Galley/Intra/Spar.hs index c10f3109d3..ce9f569a60 100644 --- a/services/galley/src/Galley/Intra/Spar.hs +++ b/services/galley/src/Galley/Intra/Spar.hs @@ -23,19 +23,14 @@ where import Bilge import Data.ByteString.Conversion import Data.Id -import Galley.App -import Galley.Effects import Galley.Intra.Util import Imports import Network.HTTP.Types.Method -- | Notify Spar that a team is being deleted. -deleteTeam :: Member SparAccess r => TeamId -> Galley r () +deleteTeam :: TeamId -> IntraM () deleteTeam tid = do - (h, p) <- sparReq - _ <- - callSpar $ - method DELETE . host h . port p - . paths ["i", "teams", toByteString' tid] - . expect2xx - pure () + void . call Spar $ + method DELETE + . paths ["i", "teams", toByteString' tid] + . expect2xx diff --git a/services/galley/src/Galley/Intra/Team.hs b/services/galley/src/Galley/Intra/Team.hs index 50cdcdd345..a6b8d96af1 100644 --- a/services/galley/src/Galley/Intra/Team.hs +++ b/services/galley/src/Galley/Intra/Team.hs @@ -1,6 +1,6 @@ -- This file is part of the Wire Server implementation. -- --- Copyright (C) 2020 Wire Swiss GmbH +-- Copyright (C) 2021 Wire Swiss GmbH -- -- This program is free software: you can redistribute it and/or modify it under -- the terms of the GNU Affero General Public License as published by the Free @@ -22,20 +22,17 @@ import Bilge.RPC import Brig.Types.Team import Data.ByteString.Conversion import Data.Id -import Galley.App -import Galley.Effects import Galley.Intra.Util import Imports import Network.HTTP.Types.Method import Network.HTTP.Types.Status import Network.Wai.Utilities.Error -getSize :: Member BrigAccess r => TeamId -> Galley r TeamSize +getSize :: TeamId -> IntraM TeamSize getSize tid = do - (h, p) <- brigReq r <- - callBrig $ - method GET . host h . port p + call Brig $ + method GET . paths ["/i/teams", toByteString' tid, "size"] . expect2xx parseResponse (mkError status502 "server-error") r diff --git a/services/galley/src/Galley/Intra/User.hs b/services/galley/src/Galley/Intra/User.hs index faea13c43e..0a08a634e0 100644 --- a/services/galley/src/Galley/Intra/User.hs +++ b/services/galley/src/Galley/Intra/User.hs @@ -17,28 +17,23 @@ module Galley.Intra.User ( getConnections, - getConnectionsUnqualified0, getConnectionsUnqualified, putConnectionInternal, deleteBot, reAuthUser, lookupActivatedUsers, - getUser, getUsers, deleteUser, getContactList, chunkify, getRichInfoMultiUser, - - -- * Internal - deleteBot0, ) where import Bilge hiding (getHeader, options, statusCode) import Bilge.RPC import Brig.Types.Connection (Relation (..), UpdateConnectionsInternal (..), UserIds (..)) -import Brig.Types.Intra +import qualified Brig.Types.Intra as Brig import Brig.Types.User (User) import Control.Monad.Catch (throwM) import Data.ByteString.Char8 (pack) @@ -46,8 +41,6 @@ import qualified Data.ByteString.Char8 as BSC import Data.ByteString.Conversion import Data.Id import Data.Qualified -import Galley.App -import Galley.Effects import Galley.Intra.Util import Imports import Network.HTTP.Client (HttpExceptionContent (..)) @@ -65,24 +58,14 @@ import Wire.API.User.RichInfo (RichInfo) -- When a connection does not exist, it is skipped. -- Calls 'Brig.API.Internal.getConnectionsStatusUnqualified'. getConnectionsUnqualified :: - Member BrigAccess r => - [UserId] -> - Maybe [UserId] -> - Maybe Relation -> - Galley r [ConnectionStatus] -getConnectionsUnqualified uFrom uTo rlt = - liftGalley0 $ getConnectionsUnqualified0 uFrom uTo rlt - -getConnectionsUnqualified0 :: [UserId] -> Maybe [UserId] -> Maybe Relation -> - Galley0 [ConnectionStatus] -getConnectionsUnqualified0 uFrom uTo rlt = do - (h, p) <- brigReq + IntraM [ConnectionStatus] +getConnectionsUnqualified uFrom uTo rlt = do r <- - call0 "brig" $ - method POST . host h . port p + call Brig $ + method POST . path "/i/users/connections-status" . maybe id rfilter rlt . json ConnectionsStatusRequest {csrFrom = uFrom, csrTo = uTo} @@ -97,53 +80,57 @@ getConnectionsUnqualified0 uFrom uTo rlt = do -- -- When a connection does not exist, it is skipped. -- Calls 'Brig.API.Internal.getConnectionsStatus'. -getConnections :: Member BrigAccess r => [UserId] -> Maybe [Qualified UserId] -> Maybe Relation -> Galley r [ConnectionStatusV2] +getConnections :: + [UserId] -> + Maybe [Qualified UserId] -> + Maybe Relation -> + IntraM [ConnectionStatusV2] getConnections [] _ _ = pure [] getConnections uFrom uTo rlt = do - (h, p) <- brigReq r <- - callBrig $ - method POST . host h . port p + call Brig $ + method POST . path "/i/users/connections-status/v2" . json (ConnectionsStatusRequestV2 uFrom uTo rlt) . expect2xx parseResponse (mkError status502 "server-error") r -putConnectionInternal :: Member BrigAccess r => UpdateConnectionsInternal -> Galley r Status +putConnectionInternal :: + UpdateConnectionsInternal -> + IntraM Status putConnectionInternal updateConn = do - (h, p) <- brigReq response <- - callBrig $ - method PUT . host h . port p + call Brig $ + method PUT . paths ["/i/connections/connection-update"] . json updateConn pure $ responseStatus response -deleteBot0 :: ConvId -> BotId -> Galley0 () -deleteBot0 cid bot = do - (h, p) <- brigReq +deleteBot :: + ConvId -> + BotId -> + IntraM () +deleteBot cid bot = do void $ - call0 "brig" $ - method DELETE . host h . port p + call Brig $ + method DELETE . path "/bot/self" . header "Z-Type" "bot" . header "Z-Bot" (toByteString' bot) . header "Z-Conversation" (toByteString' cid) . expect2xx --- | Calls 'Brig.Provider.API.botGetSelfH'. -deleteBot :: Member BotAccess r => ConvId -> BotId -> Galley r () -deleteBot cid bot = liftGalley0 $ deleteBot0 cid bot - -- | Calls 'Brig.User.API.Auth.reAuthUserH'. -reAuthUser :: Member BrigAccess r => UserId -> ReAuthUser -> Galley r Bool +reAuthUser :: + UserId -> + Brig.ReAuthUser -> + IntraM Bool reAuthUser uid auth = do - (h, p) <- brigReq let req = - method GET . host h . port p + method GET . paths ["/i/users", toByteString' uid, "reauthenticate"] . json auth - st <- statusCode . responseStatus <$> callBrig (check [status200, status403] . req) + st <- statusCode . responseStatus <$> call Brig (check [status200, status403] . req) return $ st == 200 check :: [Status] -> Request -> Request @@ -156,13 +143,12 @@ check allowed r = } -- | Calls 'Brig.API.listActivatedAccountsH'. -lookupActivatedUsers :: Member BrigAccess r => [UserId] -> Galley r [User] +lookupActivatedUsers :: [UserId] -> IntraM [User] lookupActivatedUsers = chunkify $ \uids -> do - (h, p) <- brigReq let users = BSC.intercalate "," $ toByteString' <$> uids r <- - callBrig $ - method GET . host h . port p + call Brig $ + method GET . path "/i/users" . queryItem "ids" users . expect2xx @@ -183,49 +169,41 @@ chunkify doChunk keys = mconcat <$> (doChunk `mapM` chunks keys) chunks uids = case splitAt maxSize uids of (h, t) -> h : chunks t -- | Calls 'Brig.API.listActivatedAccountsH'. -getUser :: Member BrigAccess r => UserId -> Galley r (Maybe UserAccount) -getUser uid = listToMaybe <$> getUsers [uid] - --- | Calls 'Brig.API.listActivatedAccountsH'. -getUsers :: Member BrigAccess r => [UserId] -> Galley r [UserAccount] +getUsers :: [UserId] -> IntraM [Brig.UserAccount] getUsers = chunkify $ \uids -> do - (h, p) <- brigReq resp <- - callBrig $ - method GET . host h . port p + call Brig $ + method GET . path "/i/users" . queryItem "ids" (BSC.intercalate "," (toByteString' <$> uids)) . expect2xx pure . fromMaybe [] . responseJsonMaybe $ resp -- | Calls 'Brig.API.deleteUserNoVerifyH'. -deleteUser :: Member BrigAccess r => UserId -> Galley r () +deleteUser :: UserId -> IntraM () deleteUser uid = do - (h, p) <- brigReq void $ - callBrig $ - method DELETE . host h . port p + call Brig $ + method DELETE . paths ["/i/users", toByteString' uid] . expect2xx -- | Calls 'Brig.API.getContactListH'. -getContactList :: Member BrigAccess r => UserId -> Galley r [UserId] +getContactList :: UserId -> IntraM [UserId] getContactList uid = do - (h, p) <- brigReq r <- - callBrig $ - method GET . host h . port p + call Brig $ + method GET . paths ["/i/users", toByteString' uid, "contacts"] . expect2xx cUsers <$> parseResponse (mkError status502 "server-error") r -- | Calls 'Brig.API.Internal.getRichInfoMultiH' -getRichInfoMultiUser :: Member BrigAccess r => [UserId] -> Galley r [(UserId, RichInfo)] +getRichInfoMultiUser :: [UserId] -> IntraM [(UserId, RichInfo)] getRichInfoMultiUser = chunkify $ \uids -> do - (h, p) <- brigReq resp <- - callBrig $ - method GET . host h . port p + call Brig $ + method GET . paths ["/i/users/rich-info"] . queryItem "ids" (toByteString' (List uids)) . expect2xx diff --git a/services/galley/src/Galley/Intra/Util.hs b/services/galley/src/Galley/Intra/Util.hs index a9dc8ff882..203c6ab390 100644 --- a/services/galley/src/Galley/Intra/Util.hs +++ b/services/galley/src/Galley/Intra/Util.hs @@ -1,3 +1,5 @@ +{-# LANGUAGE GeneralizedNewtypeDeriving #-} + -- This file is part of the Wire Server implementation. -- -- Copyright (C) 2020 Wire Swiss GmbH @@ -16,56 +18,120 @@ -- with this program. If not, see . module Galley.Intra.Util - ( brigReq, - sparReq, - call0, - callBrig, - callSpar, - callBot, - x1, + ( IntraComponent (..), + IntraM, + embedIntra, + call, + asyncCall, ) where import Bilge hiding (getHeader, options, statusCode) import Bilge.RPC import Bilge.Retry -import Control.Lens (view) +import Cassandra (MonadClient (..), runClient) +import Control.Lens (locally, view, (^.)) +import Control.Monad.Catch import Control.Retry import qualified Data.ByteString.Lazy as LB import Data.Misc (portNumber) import Data.Text.Encoding (encodeUtf8) import qualified Data.Text.Lazy as LT -import Galley.App -import Galley.Effects +import Galley.Env import Galley.Options -import Imports +import Imports hiding (log) +import Network.HTTP.Types +import Polysemy +import qualified Polysemy.Reader as P +import System.Logger +import qualified System.Logger.Class as LC import Util.Options -brigReq :: Galley r (ByteString, Word16) -brigReq = do - h <- encodeUtf8 <$> view (options . optBrig . epHost) - p <- portNumber . fromIntegral <$> view (options . optBrig . epPort) - return (h, p) +data IntraComponent = Brig | Spar | Gundeck + deriving (Show) + +componentName :: IntraComponent -> String +componentName Brig = "brig" +componentName Spar = "spar" +componentName Gundeck = "gundeck" + +componentRequest :: IntraComponent -> Opts -> Request -> Request +componentRequest Brig o = + host (encodeUtf8 (o ^. optBrig . epHost)) + . port (portNumber (fromIntegral (o ^. optBrig . epPort))) +componentRequest Spar o = + host (encodeUtf8 (o ^. optSpar . epHost)) + . port (portNumber (fromIntegral (o ^. optSpar . epPort))) +componentRequest Gundeck o = + host (encodeUtf8 $ o ^. optGundeck . epHost) + . port (portNumber $ fromIntegral (o ^. optGundeck . epPort)) + . method POST + . path "/i/push/v2" + . expect2xx -sparReq :: Galley r (ByteString, Word16) -sparReq = do - h <- encodeUtf8 <$> view (options . optSpar . epHost) - p <- portNumber . fromIntegral <$> view (options . optSpar . epPort) - return (h, p) +componentRetryPolicy :: IntraComponent -> RetryPolicy +componentRetryPolicy Brig = x1 +componentRetryPolicy Spar = x1 +componentRetryPolicy Gundeck = x3 --- gundeckReq lives in Galley.Intra.Push +embedIntra :: + Members '[Embed IO, P.Reader Env] r => + IntraM a -> + Sem r a +embedIntra action = do + env <- P.ask + embed $ runHttpT (env ^. manager) (runReaderT (unIntraM action) env) -call0 :: LT.Text -> (Request -> Request) -> Galley0 (Response (Maybe LB.ByteString)) -call0 n r = liftGalley0 $ recovering x1 rpcHandlers (const (rpc n r)) +newtype IntraM a = IntraM {unIntraM :: ReaderT Env Http a} + deriving + ( Functor, + Applicative, + Monad, + MonadIO, + MonadHttp, + MonadThrow, + MonadCatch, + MonadMask, + MonadReader Env, + MonadUnliftIO + ) -callBrig :: Member BrigAccess r => (Request -> Request) -> Galley r (Response (Maybe LB.ByteString)) -callBrig r = liftGalley0 $ call0 "brig" r +instance HasRequestId IntraM where + getRequestId = IntraM $ view reqId -callSpar :: Member SparAccess r => (Request -> Request) -> Galley r (Response (Maybe LB.ByteString)) -callSpar r = liftGalley0 $ call0 "spar" r +instance MonadClient IntraM where + liftClient m = do + cs <- view cstate + liftIO $ runClient cs m + localState f = locally cstate f -callBot :: Member BotAccess r => (Request -> Request) -> Galley r (Response (Maybe LB.ByteString)) -callBot r = liftGalley0 $ call0 "brig" r +instance LC.MonadLogger IntraM where + log lvl m = do + env <- ask + log (env ^. applog) lvl (reqIdMsg (env ^. reqId) . m) + +call :: + IntraComponent -> + (Request -> Request) -> + IntraM (Response (Maybe LB.ByteString)) +call comp r = do + o <- view options + let r0 = componentRequest comp o + let n = LT.pack (componentName comp) + recovering (componentRetryPolicy comp) rpcHandlers (const (rpc n (r . r0))) + +asyncCall :: IntraComponent -> (Request -> Request) -> IntraM () +asyncCall comp req = void $ do + let n = LT.pack (componentName comp) + forkIO $ catches (void (call comp req)) (handlers n) + where + handlers n = + [ Handler $ \(x :: RPCException) -> LC.err (rpcExceptionMsg x), + Handler $ \(x :: SomeException) -> LC.err $ "remote" .= n ~~ msg (show x) + ] x1 :: RetryPolicy x1 = limitRetries 1 + +x3 :: RetryPolicy +x3 = limitRetries 3 <> exponentialBackoff 100000