diff --git a/changelog.d/6-federation/wpb-3867-queue-endpoints b/changelog.d/6-federation/wpb-3867-queue-endpoints new file mode 100644 index 0000000000..b3f9efb6b7 --- /dev/null +++ b/changelog.d/6-federation/wpb-3867-queue-endpoints @@ -0,0 +1 @@ +Constrain which federation endpoints can be used via the queueing federation client diff --git a/changelog.d/6-federation/wpb-3867-unreachable-users b/changelog.d/6-federation/wpb-3867-unreachable-users new file mode 100644 index 0000000000..19dde73310 --- /dev/null +++ b/changelog.d/6-federation/wpb-3867-unreachable-users @@ -0,0 +1 @@ +There is a breaking change in the "on-mls-message-sent" federation endpoint due to queueing. Now that there is retrying because of queueing, the endpoint can no longer respond with a list of unreachable users. diff --git a/libs/wire-api-federation/src/Wire/API/Federation/API.hs b/libs/wire-api-federation/src/Wire/API/Federation/API.hs index f344a80ced..476df18303 100644 --- a/libs/wire-api-federation/src/Wire/API/Federation/API.hs +++ b/libs/wire-api-federation/src/Wire/API/Federation/API.hs @@ -37,10 +37,12 @@ import Data.Kind import Data.Proxy import GHC.TypeLits import Imports +import Servant import Servant.Client import Servant.Client.Core import Wire.API.Federation.API.Brig import Wire.API.Federation.API.Cargohold +import Wire.API.Federation.API.Common import Wire.API.Federation.API.Galley import Wire.API.Federation.BackendNotifications import Wire.API.Federation.Client @@ -64,6 +66,20 @@ type HasFedEndpoint comp api name = (HasUnsafeFedEndpoint comp api name) -- you to forget about some federated calls. type HasUnsafeFedEndpoint comp api name = 'Just api ~ LookupEndpoint (FedApi comp) name +-- | Constrains which endpoints can be used with FedQueueClient. +-- +-- Since the servant client implementation underlying FedQueueClient is +-- returning a "fake" response consisting of an empty object, we need to make +-- sure that an API type is compatible with an empty response if we want to +-- invoke it using `fedQueueClient` +class HasEmptyResponse api + +instance HasEmptyResponse (Post '[JSON] EmptyResponse) + +instance HasEmptyResponse api => HasEmptyResponse (x :> api) + +instance HasEmptyResponse api => HasEmptyResponse (Named name api) + -- | Return a client for a named endpoint. -- -- This function introduces an 'AddAnnotation' constraint, which is @@ -79,7 +95,11 @@ fedClient = clientIn (Proxy @api) (Proxy @m) fedQueueClient :: forall (comp :: Component) (name :: Symbol) m api. - (HasFedEndpoint comp api name, HasClient m api, m ~ FedQueueClient comp) => + ( HasEmptyResponse api, + HasFedEndpoint comp api name, + HasClient m api, + m ~ FedQueueClient comp + ) => Client m api fedQueueClient = clientIn (Proxy @api) (Proxy @m) diff --git a/libs/wire-api-federation/src/Wire/API/Federation/API/Galley.hs b/libs/wire-api-federation/src/Wire/API/Federation/API/Galley.hs index 3c4d1db2fa..783df1a228 100644 --- a/libs/wire-api-federation/src/Wire/API/Federation/API/Galley.hs +++ b/libs/wire-api-federation/src/Wire/API/Federation/API/Galley.hs @@ -40,7 +40,6 @@ import Wire.API.MLS.SubConversation import Wire.API.MakesFederatedCall import Wire.API.Message import Wire.API.Routes.Public.Galley.Messaging -import Wire.API.Unreachable import Wire.API.Util.Aeson (CustomEncoded (..), CustomEncodedLensable (..)) import Wire.Arbitrary (Arbitrary, GenericUniform (..)) @@ -97,7 +96,7 @@ type GalleyApi = ConversationUpdateRequest ConversationUpdateResponse :<|> FedEndpoint "mls-welcome" MLSWelcomeRequest MLSWelcomeResponse - :<|> FedEndpoint "on-mls-message-sent" RemoteMLSMessage RemoteMLSMessageResponse + :<|> FedEndpoint "on-mls-message-sent" RemoteMLSMessage EmptyResponse :<|> FedEndpointWithMods '[ MakesFederatedCall 'Galley "on-conversation-updated", MakesFederatedCall 'Galley "on-mls-message-sent", @@ -419,7 +418,7 @@ data MLSMessageResponse MLSMessageResponseUnreachableBackends (Set Domain) | -- | If the list of unreachable users is non-empty, it corresponds to users -- that an application message could not be sent to. - MLSMessageResponseUpdates [ConversationUpdate] (Maybe UnreachableUsers) + MLSMessageResponseUpdates [ConversationUpdate] | MLSMessageResponseNonFederatingBackends NonFederatingBackends deriving stock (Eq, Show, Generic) deriving (ToJSON, FromJSON) via (CustomEncoded MLSMessageResponse) diff --git a/services/galley/src/Galley/API/Action.hs b/services/galley/src/Galley/API/Action.hs index 2efa6ca4d7..b3ac477ebe 100644 --- a/services/galley/src/Galley/API/Action.hs +++ b/services/galley/src/Galley/API/Action.hs @@ -786,19 +786,22 @@ notifyConversationAction tag quid notifyOrigDomain con lconv targets action = do now <- input let lcnv = fmap (.convId) lconv e = conversationActionToEvent tag now quid (tUntagged lcnv) Nothing action - - let mkUpdate uids = + mkUpdate uids = ConversationUpdate now quid (tUnqualified lcnv) uids (SomeConversationAction tag action) - - update <- do - let remoteTargets = toList (bmRemotes targets) - updates <- - enqueueNotificationsConcurrently Q.Persistent remoteTargets $ \ruids -> do + handleError :: FederationError -> Sem r (Maybe ConversationUpdate) + handleError fedErr = + logRemoteNotificationError @"on-conversation-updated" fedErr $> Nothing + + update <- + fmap (fromMaybe (mkUpdate [])) + . (either handleError (pure . asum . map tUnqualified)) + <=< enqueueNotificationsConcurrently Q.Persistent (toList (bmRemotes targets)) + $ \ruids -> do let update = mkUpdate (tUnqualified ruids) -- if notifyOrigDomain is false, filter out user from quid's domain, -- because quid's backend will update local state and notify its users @@ -806,13 +809,6 @@ notifyConversationAction tag quid notifyOrigDomain con lconv targets action = do if notifyOrigDomain || tDomain ruids /= qDomain quid then fedQueueClient @'Galley @"on-conversation-updated" update $> Nothing else pure (Just update) - case partitionEithers updates of - (ls :: [Remote ([UserId], FederationError)], rs) -> do - for_ ls $ - logError - "on-conversation-updated" - "An error occurred while communicating with federated server: " - pure $ fromMaybe (mkUpdate []) . asum . map tUnqualified $ rs -- notify local participants and bots pushConversationEvent con e (qualifyAs lcnv (bmLocals targets)) (bmBots targets) @@ -820,13 +816,6 @@ notifyConversationAction tag quid notifyOrigDomain con lconv targets action = do -- return both the event and the 'ConversationUpdate' structure corresponding -- to the originating domain (if it is remote) pure $ LocalConversationUpdate e update - where - logError :: String -> String -> Remote (a, FederationError) -> Sem r () - logError field msg e = - P.warn $ - Log.field "federation call" field - . Log.field "domain" (_domainText (tDomain e)) - . Log.msg (msg <> displayException (snd (tUnqualified e))) -- | Update the local database with information on conversation members joining -- or leaving. Finally, push out notifications to local users. diff --git a/services/galley/src/Galley/API/Federation.hs b/services/galley/src/Galley/API/Federation.hs index 18bb120620..8913b86d9e 100644 --- a/services/galley/src/Galley/API/Federation.hs +++ b/services/galley/src/Galley/API/Federation.hs @@ -78,7 +78,7 @@ import Wire.API.Error.Galley import Wire.API.Event.Conversation import Wire.API.Federation.API import Wire.API.Federation.API.Common (EmptyResponse (..)) -import Wire.API.Federation.API.Galley qualified as F +import Wire.API.Federation.API.Galley import Wire.API.Federation.Error import Wire.API.MLS.CommitBundle import Wire.API.MLS.Credential @@ -127,7 +127,7 @@ onClientRemoved :: Member TinyLog r ) => Domain -> - F.ClientRemovedRequest -> + ClientRemovedRequest -> Sem r EmptyResponse onClientRemoved domain req = do let qusr = Qualified req.user domain @@ -136,7 +136,7 @@ onClientRemoved domain req = do mConv <- E.getConversation convId for mConv $ \conv -> do lconv <- qualifyLocal conv - removeClient lconv qusr (F.client req) + removeClient lconv qusr (req.client) pure EmptyResponse onConversationCreated :: @@ -148,17 +148,17 @@ onConversationCreated :: Member P.TinyLog r ) => Domain -> - F.ConversationCreated ConvId -> + ConversationCreated ConvId -> Sem r EmptyResponse onConversationCreated domain rc = do let qrc = fmap (toRemoteUnsafe domain) rc loc <- qualifyLocal () - let (localUserIds, _) = partitionQualified loc (map omQualifiedId (toList (F.nonCreatorMembers rc))) + let (localUserIds, _) = partitionQualified loc (map omQualifiedId (toList (nonCreatorMembers rc))) addedUserIds <- addLocalUsersToRemoteConv - (F.cnvId qrc) - (tUntagged (F.ccRemoteOrigUserId qrc)) + (cnvId qrc) + (tUntagged (ccRemoteOrigUserId qrc)) localUserIds let connectedMembers = @@ -169,16 +169,16 @@ onConversationCreated domain rc = do (const True) . omQualifiedId ) - (F.nonCreatorMembers rc) + (nonCreatorMembers rc) -- Make sure to notify only about local users connected to the adder - let qrcConnected = qrc {F.nonCreatorMembers = connectedMembers} + let qrcConnected = qrc {nonCreatorMembers = connectedMembers} for_ (fromConversationCreated loc qrcConnected) $ \(mem, c) -> do let event = Event - (tUntagged (F.cnvId qrcConnected)) + (tUntagged (cnvId qrcConnected)) Nothing - (tUntagged (F.ccRemoteOrigUserId qrcConnected)) + (tUntagged (ccRemoteOrigUserId qrcConnected)) qrcConnected.time (EdConversation c) pushConversationEvent Nothing event (qualifyAs loc [qUnqualified . Public.memId $ mem]) [] @@ -189,12 +189,12 @@ getConversations :: Member (Input (Local ())) r ) => Domain -> - F.GetConversationsRequest -> - Sem r F.GetConversationsResponse -getConversations domain (F.GetConversationsRequest uid cids) = do + GetConversationsRequest -> + Sem r GetConversationsResponse +getConversations domain (GetConversationsRequest uid cids) = do let ruid = toRemoteUnsafe domain uid loc <- qualifyLocal () - F.GetConversationsResponse + GetConversationsResponse . mapMaybe (Mapping.conversationToRemote (tDomain loc) ruid) <$> E.getConversations cids @@ -209,7 +209,7 @@ onConversationUpdated :: Member P.TinyLog r ) => Domain -> - F.ConversationUpdate -> + ConversationUpdate -> Sem r EmptyResponse onConversationUpdated requestingDomain cu = do let rcu = toRemoteUnsafe requestingDomain cu @@ -232,18 +232,18 @@ leaveConversation :: Member TinyLog r ) => Domain -> - F.LeaveConversationRequest -> - Sem r F.LeaveConversationResponse + LeaveConversationRequest -> + Sem r LeaveConversationResponse leaveConversation requestingDomain lc = do let leaver = Qualified lc.leaver requestingDomain lcnv <- qualifyLocal lc.convId res <- runError - . mapToRuntimeError @'ConvNotFound F.RemoveFromConversationErrorNotFound - . mapToRuntimeError @('ActionDenied 'LeaveConversation) F.RemoveFromConversationErrorRemovalNotAllowed - . mapToRuntimeError @'InvalidOperation F.RemoveFromConversationErrorRemovalNotAllowed - . mapError @NoChanges (const F.RemoveFromConversationErrorUnchanged) + . mapToRuntimeError @'ConvNotFound RemoveFromConversationErrorNotFound + . mapToRuntimeError @('ActionDenied 'LeaveConversation) RemoveFromConversationErrorRemovalNotAllowed + . mapToRuntimeError @'InvalidOperation RemoveFromConversationErrorRemovalNotAllowed + . mapError @NoChanges (const RemoveFromConversationErrorUnchanged) $ do (conv, _self) <- getConversationAndMemberWithError @'ConvNotFound leaver lcnv outcome <- @@ -262,7 +262,7 @@ leaveConversation requestingDomain lc = do Right _ -> pure conv case res of - Left e -> pure $ F.LeaveConversationResponse (Left e) + Left e -> pure $ LeaveConversationResponse (Left e) Right conv -> do let remotes = filter ((== qDomain leaver) . tDomain) (rmId <$> Data.convRemoteMembers conv) let botsAndMembers = BotsAndMembers mempty (Set.fromList remotes) mempty @@ -283,7 +283,7 @@ leaveConversation requestingDomain lc = do throw . internalErr $ e Right _ -> pure () - pure $ F.LeaveConversationResponse (Right ()) + pure $ LeaveConversationResponse (Right ()) where internalErr = InternalErrorWithDescription . LT.pack . displayException @@ -298,17 +298,17 @@ onMessageSent :: Member P.TinyLog r ) => Domain -> - F.RemoteMessage ConvId -> + RemoteMessage ConvId -> Sem r EmptyResponse onMessageSent domain rmUnqualified = do let rm = fmap (toRemoteUnsafe domain) rmUnqualified convId = tUntagged rm.conversation msgMetadata = MessageMetadata - { mmNativePush = F.push rm, - mmTransient = F.transient rm, - mmNativePriority = F.priority rm, - mmData = F._data rm + { mmNativePush = push rm, + mmTransient = transient rm, + mmNativePriority = priority rm, + mmData = _data rm } recipientMap = userClientMap rm.recipients msgs = toMapOf (itraversed <.> itraversed) recipientMap @@ -354,13 +354,13 @@ sendMessage :: Member P.TinyLog r ) => Domain -> - F.ProteusMessageSendRequest -> - Sem r F.MessageSendResponse + ProteusMessageSendRequest -> + Sem r MessageSendResponse sendMessage originDomain msr = do let sender = Qualified msr.sender originDomain msg <- either throwErr pure (fromProto (fromBase64ByteString msr.rawMessage)) lcnv <- qualifyLocal msr.convId - F.MessageSendResponse <$> postQualifiedOtrMessage User sender Nothing lcnv msg + MessageSendResponse <$> postQualifiedOtrMessage User sender Nothing lcnv msg where throwErr = throw . InvalidPayload . LT.pack @@ -379,12 +379,12 @@ onUserDeleted :: Member TinyLog r ) => Domain -> - F.UserDeletedConversationsNotification -> + UserDeletedConversationsNotification -> Sem r EmptyResponse onUserDeleted origDomain udcn = do let deletedUser = toRemoteUnsafe origDomain udcn.user untaggedDeletedUser = tUntagged deletedUser - convIds = F.conversations udcn + convIds = conversations udcn E.spawnMany $ fromRange convIds <&> \c -> do @@ -445,14 +445,14 @@ updateConversation :: Member (Input (Local ())) r ) => Domain -> - F.ConversationUpdateRequest -> - Sem r F.ConversationUpdateResponse + ConversationUpdateRequest -> + Sem r ConversationUpdateResponse updateConversation origDomain updateRequest = do loc <- qualifyLocal () let rusr = toRemoteUnsafe origDomain updateRequest.user lcnv = qualifyAs loc updateRequest.convId - mkResponse $ case F.action updateRequest of + mkResponse $ case action updateRequest of SomeConversationAction tag action -> case tag of SConversationJoinTag -> mapToGalleyError @(HasConversationActionGalleyErrors 'ConversationJoinTag) @@ -499,15 +499,15 @@ updateConversation origDomain updateRequest = do $ updateLocalConversation @'ConversationAccessDataTag lcnv (tUntagged rusr) Nothing action where mkResponse = - fmap (either F.ConversationUpdateResponseError Imports.id) + fmap (either ConversationUpdateResponseError Imports.id) . runError @GalleyError - . fmap (fromRight F.ConversationUpdateResponseNoChanges) + . fmap (fromRight ConversationUpdateResponseNoChanges) . runError @NoChanges - . fmap (either F.ConversationUpdateResponseNonFederatingBackends Imports.id) + . fmap (either ConversationUpdateResponseNonFederatingBackends Imports.id) . runError @NonFederatingBackends - . fmap (either F.ConversationUpdateResponseUnreachableBackends id) + . fmap (either ConversationUpdateResponseUnreachableBackends Imports.id) . runError @UnreachableBackends - . fmap F.ConversationUpdateResponseUpdate + . fmap ConversationUpdateResponseUpdate handleMLSMessageErrors :: ( r1 @@ -521,18 +521,18 @@ handleMLSMessageErrors :: ': r ) ) => - Sem r1 F.MLSMessageResponse -> - Sem r F.MLSMessageResponse + Sem r1 MLSMessageResponse -> + Sem r MLSMessageResponse handleMLSMessageErrors = - fmap (either (F.MLSMessageResponseProtocolError . unTagged) Imports.id) + fmap (either (MLSMessageResponseProtocolError . unTagged) Imports.id) . runError @MLSProtocolError - . fmap (either F.MLSMessageResponseError Imports.id) + . fmap (either MLSMessageResponseError Imports.id) . runError - . fmap (either (F.MLSMessageResponseProposalFailure . pfInner) Imports.id) + . fmap (either (MLSMessageResponseProposalFailure . pfInner) Imports.id) . runError - . fmap (either F.MLSMessageResponseNonFederatingBackends Imports.id) + . fmap (either MLSMessageResponseNonFederatingBackends Imports.id) . runError - . fmap (either (F.MLSMessageResponseUnreachableBackends . Set.fromList . (.backends)) id) + . fmap (either (MLSMessageResponseUnreachableBackends . Set.fromList . (.backends)) Imports.id) . runError @UnreachableBackends . mapToGalleyError @MLSBundleStaticErrors @@ -557,8 +557,8 @@ sendMLSCommitBundle :: Member ProposalStore r ) => Domain -> - F.MLSMessageSendRequest -> - Sem r F.MLSMessageResponse + MLSMessageSendRequest -> + Sem r MLSMessageResponse sendMLSCommitBundle remoteDomain msr = handleMLSMessageErrors $ do assertMLSEnabled loc <- qualifyLocal () @@ -566,8 +566,8 @@ sendMLSCommitBundle remoteDomain msr = handleMLSMessageErrors $ do bundle <- either (throw . mlsProtocolError) pure $ deserializeCommitBundle (fromBase64ByteString msr.rawMessage) let msg = rmValue (cbCommitMsg bundle) qcnv <- E.getConversationIdByGroupId (msgGroupId msg) >>= noteS @'ConvNotFound - when (Conv (qUnqualified qcnv) /= F.convOrSubId msr) $ throwS @'MLSGroupConversationMismatch - uncurry F.MLSMessageResponseUpdates . (,mempty) . map lcuUpdate + when (Conv (qUnqualified qcnv) /= convOrSubId msr) $ throwS @'MLSGroupConversationMismatch + MLSMessageResponseUpdates . map lcuUpdate <$> postMLSCommitBundle loc (tUntagged sender) Nothing qcnv Nothing bundle sendMLSMessage :: @@ -591,8 +591,8 @@ sendMLSMessage :: Member ProposalStore r ) => Domain -> - F.MLSMessageSendRequest -> - Sem r F.MLSMessageResponse + MLSMessageSendRequest -> + Sem r MLSMessageResponse sendMLSMessage remoteDomain msr = handleMLSMessageErrors $ do assertMLSEnabled loc <- qualifyLocal () @@ -601,9 +601,10 @@ sendMLSMessage remoteDomain msr = handleMLSMessageErrors $ do case rmValue raw of SomeMessage _ msg -> do qcnv <- E.getConversationIdByGroupId (msgGroupId msg) >>= noteS @'ConvNotFound - when (Conv (qUnqualified qcnv) /= F.convOrSubId msr) $ throwS @'MLSGroupConversationMismatch - uncurry F.MLSMessageResponseUpdates - . first (map lcuUpdate) + when (Conv (qUnqualified qcnv) /= convOrSubId msr) $ throwS @'MLSGroupConversationMismatch + MLSMessageResponseUpdates + . map lcuUpdate + . fst <$> postMLSMessage loc (tUntagged sender) Nothing qcnv Nothing raw class ToGalleyRuntimeError (effs :: EffectRow) r where @@ -637,10 +638,10 @@ mlsSendWelcome :: Member (Input UTCTime) r ) => Domain -> - F.MLSWelcomeRequest -> - Sem r F.MLSWelcomeResponse -mlsSendWelcome _origDomain (fromBase64ByteString . F.mlsWelcomeRequest -> rawWelcome) = - fmap (either (const F.MLSWelcomeMLSNotEnabled) (const F.MLSWelcomeSent)) + MLSWelcomeRequest -> + Sem r MLSWelcomeResponse +mlsSendWelcome _origDomain (fromBase64ByteString . mlsWelcomeRequest -> rawWelcome) = + fmap (either (const MLSWelcomeMLSNotEnabled) (const MLSWelcomeSent)) . runError @(Tagged 'MLSNotEnabled ()) $ do assertMLSEnabled @@ -669,10 +670,11 @@ onMLSMessageSent :: Member P.TinyLog r ) => Domain -> - F.RemoteMLSMessage -> - Sem r F.RemoteMLSMessageResponse + RemoteMLSMessage -> + Sem r EmptyResponse onMLSMessageSent domain rmm = - fmap (either (const F.RemoteMLSMessageMLSNotEnabled) (const F.RemoteMLSMessageOk)) + (EmptyResponse <$) + . (logError =<<) . runError @(Tagged 'MLSNotEnabled ()) $ do assertMLSEnabled @@ -699,6 +701,15 @@ onMLSMessageSent domain rmm = runMessagePush loc (Just (tUntagged rcnv)) $ newMessagePush mempty Nothing rmm.metadata recipients e + where + logError :: Member P.TinyLog r => Either (Tagged 'MLSNotEnabled ()) () -> Sem r () + logError (Left _) = + P.warn $ + Log.field "conversation" (toByteString' rmm.conversation) + Log.~~ Log.field "domain" (toByteString' domain) + Log.~~ Log.msg + ("Cannot process remote MLS message because MLS is disabled on this backend" :: ByteString) + logError _ = pure () queryGroupInfo :: ( Member ConversationStore r, @@ -707,10 +718,10 @@ queryGroupInfo :: Member MemberStore r ) => Domain -> - F.GetGroupInfoRequest -> - Sem r F.GetGroupInfoResponse + GetGroupInfoRequest -> + Sem r GetGroupInfoResponse queryGroupInfo origDomain req = - fmap (either F.GetGroupInfoResponseError F.GetGroupInfoResponseState) + fmap (either GetGroupInfoResponseError GetGroupInfoResponseState) . runError @GalleyError . mapToGalleyError @MLSGroupInfoStaticErrors $ do @@ -731,9 +742,9 @@ updateTypingIndicator :: Member (Input (Local ())) r ) => Domain -> - F.TypingDataUpdateRequest -> - Sem r F.TypingDataUpdateResponse -updateTypingIndicator origDomain F.TypingDataUpdateRequest {..} = do + TypingDataUpdateRequest -> + Sem r TypingDataUpdateResponse +updateTypingIndicator origDomain TypingDataUpdateRequest {..} = do let qusr = Qualified userId origDomain lcnv <- qualifyLocal convId @@ -743,15 +754,15 @@ updateTypingIndicator origDomain F.TypingDataUpdateRequest {..} = do (conv, _) <- getConversationAndMemberWithError @'ConvNotFound qusr lcnv notifyTypingIndicator conv qusr Nothing typingStatus - pure (either F.TypingDataUpdateError F.TypingDataUpdateSuccess ret) + pure (either TypingDataUpdateError TypingDataUpdateSuccess ret) onTypingIndicatorUpdated :: ( Member GundeckAccess r ) => Domain -> - F.TypingDataUpdated -> + TypingDataUpdated -> Sem r EmptyResponse -onTypingIndicatorUpdated origDomain F.TypingDataUpdated {..} = do +onTypingIndicatorUpdated origDomain TypingDataUpdated {..} = do let qcnv = Qualified convId origDomain pushTypingIndicatorEvents origUserId time usersInConv Nothing qcnv typingStatus pure EmptyResponse diff --git a/services/galley/src/Galley/API/MLS/Message.hs b/services/galley/src/Galley/API/MLS/Message.hs index 781e8258f2..7781b7379b 100644 --- a/services/galley/src/Galley/API/MLS/Message.hs +++ b/services/galley/src/Galley/API/MLS/Message.hs @@ -33,7 +33,6 @@ import Data.Domain import Data.Id import Data.Json.Util import Data.List.NonEmpty (NonEmpty, nonEmpty) -import Data.List.NonEmpty qualified as NE import Data.Map qualified as Map import Data.Qualified import Data.Set qualified as Set @@ -311,7 +310,6 @@ postMLSCommitBundleToRemoteConv :: ( Member BrigAccess r, Members MLSBundleStaticErrors r, Member (Error FederationError) r, - Member (Error InternalError) r, Member (Error MLSProtocolError) r, Member (Error MLSProposalFailure) r, Member (Error NonFederatingBackends) r, @@ -347,13 +345,7 @@ postMLSCommitBundleToRemoteConv loc qusr con bundle rcnv = do MLSMessageResponseProtocolError e -> throw (mlsProtocolError e) MLSMessageResponseProposalFailure e -> throw (MLSProposalFailure e) MLSMessageResponseUnreachableBackends ds -> throw (UnreachableBackends (toList ds)) - MLSMessageResponseUpdates updates unreachables -> do - for_ unreachables $ \us -> - throw . InternalErrorWithDescription $ - "A commit to a remote conversation should not ever return a \ - \non-empty list of users an application message could not be \ - \sent to. The remote end returned: " - <> LT.pack (intercalate ", " (show <$> NE.toList (unreachableUsers us))) + MLSMessageResponseUpdates updates -> do fmap fst . runOutputList . runInputConst (void loc) $ for_ updates $ \update -> do me <- updateLocalStateOfRemoteConv (qualifyAs rcnv update) con @@ -538,12 +530,12 @@ postMLSMessageToRemoteConv loc qusr _senderClient con smsg rcnv = do \not ever return a non-empty list of domains a commit could not be \ \sent to. The remote end returned: " <> LT.pack (intercalate ", " (show <$> Set.toList (Set.map domainText ds))) - MLSMessageResponseUpdates updates unreachables -> do + MLSMessageResponseUpdates updates -> do lcus <- fmap fst . runOutputList $ for_ updates $ \update -> do me <- updateLocalStateOfRemoteConv (qualifyAs rcnv update) con for_ me $ \e -> output (LocalConversationUpdate e update) - pure (lcus, unreachables) + pure (lcus, Nothing) MLSMessageResponseNonFederatingBackends e -> throw e type HasProposalEffects r = diff --git a/services/galley/src/Galley/API/MLS/Propagate.hs b/services/galley/src/Galley/API/MLS/Propagate.hs index d40758fa73..433d9f791d 100644 --- a/services/galley/src/Galley/API/MLS/Propagate.hs +++ b/services/galley/src/Galley/API/MLS/Propagate.hs @@ -18,8 +18,6 @@ module Galley.API.MLS.Propagate where import Control.Comonad -import Data.Aeson qualified as A -import Data.Domain import Data.Id import Data.Json.Util import Data.Map qualified as Map @@ -33,17 +31,12 @@ import Galley.Effects import Galley.Effects.FederatorAccess import Galley.Types.Conversations.Members import Imports -import Network.Wai.Utilities.JSONResponse import Polysemy import Polysemy.Input import Polysemy.TinyLog hiding (trace) -import System.Logger.Class qualified as Logger -import Wire.API.Error -import Wire.API.Error.Galley import Wire.API.Event.Conversation import Wire.API.Federation.API import Wire.API.Federation.API.Galley -import Wire.API.Federation.Error import Wire.API.Message import Wire.API.Unreachable @@ -81,19 +74,19 @@ propagateMessage qusr lconv cm con raw = do runMessagePush lconv (Just qcnv) (mkPush u c) -- send to remotes - unreachableFromList . concat - <$$> traverse handleError - <=< runFederatedConcurrentlyEither (map remoteMemberQualify rmems) - $ \(tUnqualified -> rs) -> - fedClient @'Galley @"on-mls-message-sent" $ - RemoteMLSMessage - { time = now, - sender = qusr, - metadata = mm, - conversation = tUnqualified lcnv, - recipients = rs >>= remoteMemberMLSClients, - message = Base64ByteString raw - } + void $ + runFederatedConcurrentlyEither (map remoteMemberQualify rmems) $ + \(tUnqualified -> rs) -> + fedClient @'Galley @"on-mls-message-sent" $ + RemoteMLSMessage + { time = now, + sender = qusr, + metadata = mm, + conversation = tUnqualified lcnv, + recipients = rs >>= remoteMemberMLSClients, + message = Base64ByteString raw + } + pure Nothing where localMemberMLSClients :: Local x -> LocalMember -> [(UserId, ClientId)] localMemberMLSClients loc lm = @@ -110,24 +103,3 @@ propagateMessage qusr lconv cm con raw = do in map (\(c, _) -> (remoteUserId, c)) (toList (Map.findWithDefault mempty remoteUserQId cm)) - - remotesToQIds = fmap (tUntagged . rmId) - - handleError :: - Member TinyLog r => - Either (Remote [RemoteMember], FederationError) (Remote RemoteMLSMessageResponse) -> - Sem r [Qualified UserId] - handleError (Right x) = case tUnqualified x of - RemoteMLSMessageOk -> pure [] - RemoteMLSMessageMLSNotEnabled -> do - logFedError x (errorToResponse @'MLSNotEnabled) - pure [] - handleError (Left (r, e)) = do - logFedError r (toResponse e) - pure $ remotesToQIds (tUnqualified r) - logFedError :: Member TinyLog r => Remote x -> JSONResponse -> Sem r () - logFedError r e = - warn $ - Logger.msg ("A message could not be delivered to a remote backend" :: ByteString) - . Logger.field "remote_domain" (domainText (tDomain r)) - . Logger.field "error" (A.encode e.value) diff --git a/services/galley/src/Galley/API/Util.hs b/services/galley/src/Galley/API/Util.hs index 9fd9d441d8..d441ee02fd 100644 --- a/services/galley/src/Galley/API/Util.hs +++ b/services/galley/src/Galley/API/Util.hs @@ -37,6 +37,7 @@ import Data.Set qualified as Set import Data.Singletons import Data.Text qualified as T import Data.Time +import GHC.TypeLits import Galley.API.Error import Galley.API.Mapping import Galley.Data.Conversation qualified as Data @@ -67,6 +68,7 @@ import Polysemy import Polysemy.Error import Polysemy.Input import Polysemy.TinyLog qualified as P +import System.Logger qualified as Log import Wire.API.Connection import Wire.API.Conversation hiding (Member, cnvAccess, cnvAccessRoles, cnvName, cnvType) import Wire.API.Conversation qualified as Public @@ -1008,3 +1010,13 @@ instance if err' == demote @e then throwS @e else rethrowErrors @effs @r err' + +logRemoteNotificationError :: + forall rpc r. + (Member P.TinyLog r, KnownSymbol rpc) => + FederationError -> + Sem r () +logRemoteNotificationError e = + P.warn $ + Log.field "federation call" (symbolVal (Proxy @rpc)) + . Log.msg (displayException e) diff --git a/services/galley/src/Galley/Effects/BackendNotificationQueueAccess.hs b/services/galley/src/Galley/Effects/BackendNotificationQueueAccess.hs index ac006ded4c..bdefa14631 100644 --- a/services/galley/src/Galley/Effects/BackendNotificationQueueAccess.hs +++ b/services/galley/src/Galley/Effects/BackendNotificationQueueAccess.hs @@ -22,6 +22,6 @@ data BackendNotificationQueueAccess m a where Q.DeliveryMode -> f (Remote x) -> (Remote [x] -> FedQueueClient c a) -> - BackendNotificationQueueAccess m [Either (Remote ([x], FederationError)) (Remote a)] + BackendNotificationQueueAccess m (Either FederationError [Remote a]) makeSem ''BackendNotificationQueueAccess diff --git a/services/galley/src/Galley/Intra/BackendNotificationQueue.hs b/services/galley/src/Galley/Intra/BackendNotificationQueue.hs index b9affe4858..411897a083 100644 --- a/services/galley/src/Galley/Intra/BackendNotificationQueue.hs +++ b/services/galley/src/Galley/Intra/BackendNotificationQueue.hs @@ -4,8 +4,8 @@ module Galley.Intra.BackendNotificationQueue (interpretBackendNotificationQueueA import Control.Lens (view) import Control.Monad.Catch +import Control.Monad.Trans.Except import Control.Retry -import Data.Bifunctor import Data.Domain import Data.Qualified import Galley.Effects.BackendNotificationQueueAccess (BackendNotificationQueueAccess (..)) @@ -29,22 +29,21 @@ interpretBackendNotificationQueueAccess :: Sem r a interpretBackendNotificationQueueAccess = interpret $ \case EnqueueNotification remote deliveryMode action -> do - embedApp $ enqueueNotification (tDomain remote) deliveryMode action + embedApp . runExceptT $ enqueueNotification (tDomain remote) deliveryMode action EnqueueNotificationsConcurrently m xs rpc -> do - embedApp $ enqueueNotificationsConcurrently m xs rpc + embedApp . runExceptT $ enqueueNotificationsConcurrently m xs rpc -enqueueNotification :: Domain -> Q.DeliveryMode -> FedQueueClient c a -> App (Either FederationError a) -enqueueNotification remoteDomain deliveryMode action = do - mChanVar <- view rabbitmqChannel +getChannel :: ExceptT FederationError App (MVar Q.Channel) +getChannel = view rabbitmqChannel >>= maybe (throwE FederationNotConfigured) pure + +enqueueSingleNotification :: Domain -> Q.DeliveryMode -> MVar Q.Channel -> FedQueueClient c a -> App a +enqueueSingleNotification remoteDomain deliveryMode chanVar action = do ownDomain <- view (options . settings . federationDomain) - case mChanVar of - Nothing -> pure (Left FederationNotConfigured) - Just chanVar -> do - let policy = limitRetries 3 <> constantDelay 1_000_000 - handlers = - skipAsyncExceptions - <> [logRetries (const $ pure True) logError] - Right <$> recovering policy handlers (const $ go ownDomain chanVar) + let policy = limitRetries 3 <> constantDelay 1_000_000 + handlers = + skipAsyncExceptions + <> [logRetries (const $ pure True) logError] + recovering policy handlers (const $ go ownDomain) where logError willRetry (SomeException e) status = do Log.err $ @@ -52,25 +51,29 @@ enqueueNotification remoteDomain deliveryMode action = do . Log.field "error" (displayException e) . Log.field "willRetry" willRetry . Log.field "retryCount" status.rsIterNumber - go ownDomain chanVar = do + go ownDomain = do mChan <- timeout 1_000_000 (readMVar chanVar) case mChan of Nothing -> throwM NoRabbitMqChannel Just chan -> do liftIO $ enqueue chan ownDomain remoteDomain deliveryMode action +enqueueNotification :: Domain -> Q.DeliveryMode -> FedQueueClient c a -> ExceptT FederationError App a +enqueueNotification remoteDomain deliveryMode action = do + chanVar <- getChannel + lift $ enqueueSingleNotification remoteDomain deliveryMode chanVar action + enqueueNotificationsConcurrently :: (Foldable f, Functor f) => Q.DeliveryMode -> f (Remote x) -> (Remote [x] -> FedQueueClient c a) -> - App [(Either (Remote ([x], FederationError)) (Remote a))] -enqueueNotificationsConcurrently m xs f = - pooledForConcurrentlyN 8 (bucketRemote xs) $ \r -> - bimap - (qualifyAs r . (tUnqualified r,)) - (qualifyAs r) - <$> enqueueNotification (tDomain r) m (f r) + ExceptT FederationError App [Remote a] +enqueueNotificationsConcurrently m xs f = do + chanVar <- getChannel + lift $ pooledForConcurrentlyN 8 (bucketRemote xs) $ \r -> + qualifyAs r + <$> enqueueSingleNotification (tDomain r) m chanVar (f r) data NoRabbitMqChannel = NoRabbitMqChannel deriving (Show) diff --git a/services/galley/test/integration/API/MLS/Mocks.hs b/services/galley/test/integration/API/MLS/Mocks.hs index 1b58cd9df5..10974405b8 100644 --- a/services/galley/test/integration/API/MLS/Mocks.hs +++ b/services/galley/test/integration/API/MLS/Mocks.hs @@ -80,7 +80,6 @@ sendMessageMock = "send-mls-message" ~> MLSMessageResponseUpdates [] - mempty claimKeyPackagesMock :: KeyPackageBundle -> Mock LByteString claimKeyPackagesMock kpb = "claim-key-packages" ~> kpb