diff --git a/changelog.d/6-federation/WPB-4928-notification-endpoints b/changelog.d/6-federation/WPB-4928-notification-endpoints new file mode 100644 index 0000000000..b900bd9573 --- /dev/null +++ b/changelog.d/6-federation/WPB-4928-notification-endpoints @@ -0,0 +1 @@ +Reorganise the federation API such that queueing notification endpoints are separate from synchronous endpoints. Also simplify queueing federation notification endpoints. diff --git a/libs/wire-api-federation/src/Wire/API/Federation/API.hs b/libs/wire-api-federation/src/Wire/API/Federation/API.hs index 5e6b294e12..b1859df233 100644 --- a/libs/wire-api-federation/src/Wire/API/Federation/API.hs +++ b/libs/wire-api-federation/src/Wire/API/Federation/API.hs @@ -33,10 +33,13 @@ module Wire.API.Federation.API ) where +import Data.Aeson +import Data.Domain import Data.Kind import Data.Proxy import GHC.TypeLits import Imports +import Network.AMQP import Servant import Servant.Client import Servant.Client.Core @@ -46,6 +49,8 @@ import Wire.API.Federation.API.Common import Wire.API.Federation.API.Galley import Wire.API.Federation.BackendNotifications import Wire.API.Federation.Client +import Wire.API.Federation.Component +import Wire.API.Federation.HasNotificationEndpoint import Wire.API.MakesFederatedCall import Wire.API.Routes.Named @@ -94,14 +99,32 @@ fedClient :: fedClient = clientIn (Proxy @api) (Proxy @m) fedQueueClient :: - forall (comp :: Component) (name :: Symbol) m api. - ( HasEmptyResponse api, - HasFedEndpoint comp api name, - HasClient m api, - m ~ FedQueueClient comp + forall tag api. + ( HasNotificationEndpoint tag, + -- FUTUREWORK: Include this API constraint and get it working + -- api ~ NotificationAPI tag (NotificationComponent tag), + HasEmptyResponse api, + KnownSymbol (NotificationPath tag), + KnownComponent (NotificationComponent tag), + ToJSON (Payload tag), + HasFedEndpoint (NotificationComponent tag) api (NotificationPath tag) ) => - Client m api -fedQueueClient = clientIn (Proxy @api) (Proxy @m) + Payload tag -> + FedQueueClient (NotificationComponent tag) () +fedQueueClient payload = do + env <- ask + let notif = fedNotifToBackendNotif @tag env.originDomain payload + msg = + newMsg + { msgBody = encode notif, + msgDeliveryMode = Just (env.deliveryMode), + msgContentType = Just "application/json" + } + -- Empty string means default exchange + exchange = "" + liftIO $ do + ensureQueue env.channel env.targetDomain._domainText + void $ publishMsg env.channel exchange (routingKey env.targetDomain._domainText) msg fedClientIn :: forall (comp :: Component) (name :: Symbol) m api. diff --git a/libs/wire-api-federation/src/Wire/API/Federation/API/Brig.hs b/libs/wire-api-federation/src/Wire/API/Federation/API/Brig.hs index c2a25af65b..8703e3d850 100644 --- a/libs/wire-api-federation/src/Wire/API/Federation/API/Brig.hs +++ b/libs/wire-api-federation/src/Wire/API/Federation/API/Brig.hs @@ -15,17 +15,20 @@ -- You should have received a copy of the GNU Affero General Public License along -- with this program. If not, see . -module Wire.API.Federation.API.Brig where +module Wire.API.Federation.API.Brig + ( module Notifications, + module Wire.API.Federation.API.Brig, + ) +where import Data.Aeson import Data.Domain (Domain) import Data.Handle (Handle) import Data.Id -import Data.Range import Imports import Servant.API import Test.QuickCheck (Arbitrary) -import Wire.API.Federation.API.Common +import Wire.API.Federation.API.Brig.Notifications as Notifications import Wire.API.Federation.Endpoint import Wire.API.Federation.Version import Wire.API.MLS.CipherSuite @@ -70,9 +73,11 @@ type BrigApi = :<|> FedEndpoint "get-user-clients" GetUserClients (UserMap (Set PubClient)) :<|> FedEndpoint "get-mls-clients" MLSClientsRequest (Set ClientInfo) :<|> FedEndpoint "send-connection-action" NewConnectionRequest NewConnectionResponse - :<|> FedEndpoint "on-user-deleted-connections" UserDeletedConnectionsNotification EmptyResponse :<|> FedEndpoint "claim-key-packages" ClaimKeyPackageRequest (Maybe KeyPackageBundle) :<|> FedEndpoint "get-not-fully-connected-backends" DomainSet NonConnectedBackends + -- All the notification endpoints that go through the queue-based + -- federation client ('fedQueueClient'). + :<|> BrigNotificationAPI newtype DomainSet = DomainSet { domains :: Set Domain @@ -143,18 +148,6 @@ data NewConnectionResponse deriving (Arbitrary) via (GenericUniform NewConnectionResponse) deriving (FromJSON, ToJSON) via (CustomEncoded NewConnectionResponse) -type UserDeletedNotificationMaxConnections = 1000 - -data UserDeletedConnectionsNotification = UserDeletedConnectionsNotification - { -- | This is qualified implicitly by the origin domain - user :: UserId, - -- | These are qualified implicitly by the target domain - connections :: Range 1 UserDeletedNotificationMaxConnections [UserId] - } - deriving stock (Eq, Show, Generic) - deriving (Arbitrary) via (GenericUniform UserDeletedConnectionsNotification) - deriving (FromJSON, ToJSON) via (CustomEncoded UserDeletedConnectionsNotification) - data ClaimKeyPackageRequest = ClaimKeyPackageRequest { -- | The user making the request, implictly qualified by the origin domain. claimant :: UserId, diff --git a/libs/wire-api-federation/src/Wire/API/Federation/API/Brig/Notifications.hs b/libs/wire-api-federation/src/Wire/API/Federation/API/Brig/Notifications.hs new file mode 100644 index 0000000000..efdc16722b --- /dev/null +++ b/libs/wire-api-federation/src/Wire/API/Federation/API/Brig/Notifications.hs @@ -0,0 +1,56 @@ +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2023 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Wire.API.Federation.API.Brig.Notifications where + +import Data.Aeson +import Data.Id +import Data.Range +import Imports +import Wire.API.Federation.Component +import Wire.API.Federation.Endpoint +import Wire.API.Federation.HasNotificationEndpoint +import Wire.API.Util.Aeson +import Wire.Arbitrary + +type UserDeletedNotificationMaxConnections = 1000 + +data UserDeletedConnectionsNotification = UserDeletedConnectionsNotification + { -- | This is qualified implicitly by the origin domain + user :: UserId, + -- | These are qualified implicitly by the target domain + connections :: Range 1 UserDeletedNotificationMaxConnections [UserId] + } + deriving stock (Eq, Show, Generic) + deriving (Arbitrary) via (GenericUniform UserDeletedConnectionsNotification) + deriving (FromJSON, ToJSON) via (CustomEncoded UserDeletedConnectionsNotification) + +data BrigNotificationTag = OnUserDeletedConnectionsTag + deriving (Show, Eq, Generic, Bounded, Enum) + +instance HasNotificationEndpoint 'OnUserDeletedConnectionsTag where + type Payload 'OnUserDeletedConnectionsTag = UserDeletedConnectionsNotification + type NotificationPath 'OnUserDeletedConnectionsTag = "on-user-deleted-connections" + type NotificationComponent 'OnUserDeletedConnectionsTag = 'Brig + type + NotificationAPI 'OnUserDeletedConnectionsTag 'Brig = + NotificationFedEndpoint 'OnUserDeletedConnectionsTag + +-- | All the notification endpoints return an 'EmptyResponse'. +type BrigNotificationAPI = + -- FUTUREWORK: Use NotificationAPI 'OnUserDeletedConnectionsTag 'Brig instead + NotificationFedEndpoint 'OnUserDeletedConnectionsTag diff --git a/libs/wire-api-federation/src/Wire/API/Federation/API/Galley.hs b/libs/wire-api-federation/src/Wire/API/Federation/API/Galley.hs index a635ee1cbf..f40417e303 100644 --- a/libs/wire-api-federation/src/Wire/API/Federation/API/Galley.hs +++ b/libs/wire-api-federation/src/Wire/API/Federation/API/Galley.hs @@ -15,16 +15,18 @@ -- You should have received a copy of the GNU Affero General Public License along -- with this program. If not, see . -module Wire.API.Federation.API.Galley where +module Wire.API.Federation.API.Galley + ( module Wire.API.Federation.API.Galley, + module Notifications, + ) +where import Data.Aeson (FromJSON, ToJSON) import Data.Domain import Data.Id import Data.Json.Util -import Data.List.NonEmpty (NonEmpty) import Data.Misc (Milliseconds) import Data.Qualified -import Data.Range import Data.Time.Clock (UTCTime) import Imports import Network.Wai.Utilities.JSONResponse @@ -36,12 +38,13 @@ import Wire.API.Conversation.Role (RoleName) import Wire.API.Conversation.Typing import Wire.API.Error.Galley import Wire.API.Federation.API.Common +import Wire.API.Federation.API.Galley.Notifications as Notifications import Wire.API.Federation.Endpoint import Wire.API.MLS.SubConversation import Wire.API.MakesFederatedCall import Wire.API.Message import Wire.API.Routes.Public.Galley.Messaging -import Wire.API.Util.Aeson (CustomEncoded (..), CustomEncodedLensable (..)) +import Wire.API.Util.Aeson (CustomEncoded (..)) import Wire.Arbitrary (Arbitrary, GenericUniform (..)) -- FUTUREWORK: data types, json instances, more endpoints. See @@ -58,9 +61,6 @@ type GalleyApi = -- This endpoint is called the first time a user from this backend is -- added to a remote conversation. :<|> FedEndpoint "get-conversations" GetConversationsRequest GetConversationsResponse - -- used by the backend that owns a conversation to inform this backend of - -- changes to the conversation - :<|> FedEndpoint "on-conversation-updated" ConversationUpdate EmptyResponse :<|> FedEndpointWithMods '[ MakesFederatedCall 'Galley "on-conversation-updated", MakesFederatedCall 'Galley "on-mls-message-sent", @@ -70,9 +70,6 @@ type GalleyApi = "leave-conversation" LeaveConversationRequest LeaveConversationResponse - -- used to notify this backend that a new message has been posted to a - -- remote conversation - :<|> FedEndpoint "on-message-sent" (RemoteMessage ConvId) EmptyResponse -- used by a remote backend to send a message to a conversation owned by -- this backend :<|> FedEndpointWithMods @@ -82,14 +79,6 @@ type GalleyApi = "send-message" ProteusMessageSendRequest MessageSendResponse - :<|> FedEndpointWithMods - '[ MakesFederatedCall 'Galley "on-mls-message-sent", - MakesFederatedCall 'Galley "on-conversation-updated", - MakesFederatedCall 'Brig "api-version" - ] - "on-user-deleted-conversations" - UserDeletedConversationsNotification - EmptyResponse :<|> FedEndpointWithMods '[ MakesFederatedCall 'Galley "on-conversation-updated", MakesFederatedCall 'Galley "on-mls-message-sent", @@ -100,7 +89,6 @@ type GalleyApi = ConversationUpdateRequest ConversationUpdateResponse :<|> FedEndpoint "mls-welcome" MLSWelcomeRequest MLSWelcomeResponse - :<|> FedEndpoint "on-mls-message-sent" RemoteMLSMessage EmptyResponse :<|> FedEndpointWithMods '[ MakesFederatedCall 'Galley "on-conversation-updated", MakesFederatedCall 'Galley "on-mls-message-sent", @@ -123,12 +111,6 @@ type GalleyApi = MLSMessageSendRequest MLSMessageResponse :<|> FedEndpoint "query-group-info" GetGroupInfoRequest GetGroupInfoResponse - :<|> FedEndpointWithMods - '[ MakesFederatedCall 'Galley "on-mls-message-sent" - ] - "on-client-removed" - ClientRemovedRequest - EmptyResponse :<|> FedEndpointWithMods '[ MakesFederatedCall 'Galley "on-typing-indicator-updated" ] @@ -153,6 +135,9 @@ type GalleyApi = "get-one2one-conversation" GetOne2OneConversationRequest GetOne2OneConversationResponse + -- All the notification endpoints that go through the queue-based + -- federation client ('fedQueueClient'). + :<|> GalleyNotificationAPI data TypingDataUpdateRequest = TypingDataUpdateRequest { typingStatus :: TypingStatus, @@ -180,15 +165,6 @@ data TypingDataUpdated = TypingDataUpdated deriving stock (Eq, Show, Generic) deriving (FromJSON, ToJSON) via (CustomEncoded TypingDataUpdated) -data ClientRemovedRequest = ClientRemovedRequest - { user :: UserId, - client :: ClientId, - convs :: [ConvId] - } - deriving stock (Eq, Show, Generic) - deriving (Arbitrary) via (GenericUniform ClientRemovedRequest) - deriving (FromJSON, ToJSON) via (CustomEncoded ClientRemovedRequest) - data GetConversationsRequest = GetConversationsRequest { userId :: UserId, convIds :: [ConvId] @@ -281,28 +257,6 @@ data ConversationCreated conv = ConversationCreated ccRemoteOrigUserId :: ConversationCreated (Remote ConvId) -> Remote UserId ccRemoteOrigUserId cc = qualifyAs cc.cnvId cc.origUserId -data ConversationUpdate = ConversationUpdate - { cuTime :: UTCTime, - cuOrigUserId :: Qualified UserId, - -- | The unqualified ID of the conversation where the update is happening. - -- The ID is local to the sender to prevent putting arbitrary domain that - -- is different than that of the backend making a conversation membership - -- update request. - cuConvId :: ConvId, - -- | A list of users from the receiving backend that need to be sent - -- notifications about this change. This is required as we do not expect a - -- non-conversation owning backend to have an indexed mapping of - -- conversation to users. - cuAlreadyPresentUsers :: [UserId], - -- | Information on the specific action that caused the update. - cuAction :: SomeConversationAction - } - deriving (Eq, Show, Generic) - -instance ToJSON ConversationUpdate - -instance FromJSON ConversationUpdate - data LeaveConversationRequest = LeaveConversationRequest { -- | The conversation is assumed to be owned by the target domain, which -- allows us to protect against relay attacks @@ -324,38 +278,6 @@ data RemoveFromConversationError (ToJSON, FromJSON) via (CustomEncoded RemoveFromConversationError) --- Note: this is parametric in the conversation type to allow it to be used --- both for conversations with a fixed known domain (e.g. as the argument of the --- federation RPC), and for conversations with an arbitrary Qualified or Remote id --- (e.g. as the argument of the corresponding handler). -data RemoteMessage conv = RemoteMessage - { time :: UTCTime, - _data :: Maybe Text, - sender :: Qualified UserId, - senderClient :: ClientId, - conversation :: conv, - priority :: Maybe Priority, - push :: Bool, - transient :: Bool, - recipients :: UserClientMap Text - } - deriving stock (Eq, Show, Generic, Functor) - deriving (Arbitrary) via (GenericUniform (RemoteMessage conv)) - deriving (ToJSON, FromJSON) via (CustomEncodedLensable (RemoteMessage conv)) - -data RemoteMLSMessage = RemoteMLSMessage - { time :: UTCTime, - metadata :: MessageMetadata, - sender :: Qualified UserId, - conversation :: ConvId, - subConversation :: Maybe SubConvId, - recipients :: Map UserId (NonEmpty ClientId), - message :: Base64ByteString - } - deriving stock (Eq, Show, Generic) - deriving (Arbitrary) via (GenericUniform RemoteMLSMessage) - deriving (ToJSON, FromJSON) via (CustomEncoded RemoteMLSMessage) - data RemoteMLSMessageResponse = RemoteMLSMessageOk | RemoteMLSMessageMLSNotEnabled @@ -406,18 +328,6 @@ newtype LeaveConversationResponse = LeaveConversationResponse (ToJSON, FromJSON) via (Either (CustomEncoded RemoveFromConversationError) ()) -type UserDeletedNotificationMaxConvs = 1000 - -data UserDeletedConversationsNotification = UserDeletedConversationsNotification - { -- | This is qualified implicitly by the origin domain - user :: UserId, - -- | These are qualified implicitly by the target domain - conversations :: Range 1 UserDeletedNotificationMaxConvs [ConvId] - } - deriving stock (Eq, Show, Generic) - deriving (Arbitrary) via (GenericUniform UserDeletedConversationsNotification) - deriving (FromJSON, ToJSON) via (CustomEncoded UserDeletedConversationsNotification) - data ConversationUpdateRequest = ConversationUpdateRequest { -- | The user that is attempting to perform the action. This is qualified -- implicitly by the origin domain diff --git a/libs/wire-api-federation/src/Wire/API/Federation/API/Galley/Notifications.hs b/libs/wire-api-federation/src/Wire/API/Federation/API/Galley/Notifications.hs new file mode 100644 index 0000000000..e5a401f394 --- /dev/null +++ b/libs/wire-api-federation/src/Wire/API/Federation/API/Galley/Notifications.hs @@ -0,0 +1,181 @@ +{-# OPTIONS_GHC -Wno-incomplete-patterns #-} +{-# OPTIONS_GHC -Wno-unused-matches #-} + +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2023 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Wire.API.Federation.API.Galley.Notifications where + +import Data.Aeson +import Data.Id +import Data.Json.Util +import Data.List.NonEmpty +import Data.Qualified +import Data.Range +import Data.Time.Clock +import Imports +import Servant.API +import Wire.API.Conversation.Action +import Wire.API.Federation.Component +import Wire.API.Federation.Endpoint +import Wire.API.Federation.HasNotificationEndpoint +import Wire.API.MLS.SubConversation +import Wire.API.MakesFederatedCall +import Wire.API.Message +import Wire.API.Util.Aeson +import Wire.Arbitrary + +data GalleyNotificationTag + = OnClientRemovedTag + | OnMessageSentTag + | OnMLSMessageSentTag + | OnConversationUpdatedTag + | OnUserDeletedConversationsTag + deriving (Show, Eq, Generic, Bounded, Enum) + +instance HasNotificationEndpoint 'OnClientRemovedTag where + type Payload 'OnClientRemovedTag = ClientRemovedRequest + type NotificationPath 'OnClientRemovedTag = "on-client-removed" + type NotificationComponent 'OnClientRemovedTag = 'Galley + type + NotificationAPI 'OnClientRemovedTag 'Galley = + NotificationFedEndpointWithMods + '[ MakesFederatedCall 'Galley "on-mls-message-sent" + ] + (NotificationPath 'OnClientRemovedTag) + (Payload 'OnClientRemovedTag) + +instance HasNotificationEndpoint 'OnMessageSentTag where + type Payload 'OnMessageSentTag = RemoteMessage ConvId + type NotificationPath 'OnMessageSentTag = "on-message-sent" + type NotificationComponent 'OnMessageSentTag = 'Galley + + -- used to notify this backend that a new message has been posted to a + -- remote conversation + type NotificationAPI 'OnMessageSentTag 'Galley = NotificationFedEndpoint 'OnMessageSentTag + +instance HasNotificationEndpoint 'OnMLSMessageSentTag where + type Payload 'OnMLSMessageSentTag = RemoteMLSMessage + type NotificationPath 'OnMLSMessageSentTag = "on-mls-message-sent" + type NotificationComponent 'OnMLSMessageSentTag = 'Galley + type NotificationAPI 'OnMLSMessageSentTag 'Galley = NotificationFedEndpoint 'OnMLSMessageSentTag + +instance HasNotificationEndpoint 'OnConversationUpdatedTag where + type Payload 'OnConversationUpdatedTag = ConversationUpdate + type NotificationPath 'OnConversationUpdatedTag = "on-conversation-updated" + type NotificationComponent 'OnConversationUpdatedTag = 'Galley + + -- used by the backend that owns a conversation to inform this backend of + -- changes to the conversation + type NotificationAPI 'OnConversationUpdatedTag 'Galley = NotificationFedEndpoint 'OnConversationUpdatedTag + +instance HasNotificationEndpoint 'OnUserDeletedConversationsTag where + type Payload 'OnUserDeletedConversationsTag = UserDeletedConversationsNotification + type NotificationPath 'OnUserDeletedConversationsTag = "on-user-deleted-conversations" + type NotificationComponent 'OnUserDeletedConversationsTag = 'Galley + type + NotificationAPI 'OnUserDeletedConversationsTag 'Galley = + NotificationFedEndpointWithMods + '[ MakesFederatedCall 'Galley "on-mls-message-sent", + MakesFederatedCall 'Galley "on-conversation-updated", + MakesFederatedCall 'Brig "api-version" + ] + (NotificationPath 'OnUserDeletedConversationsTag) + (Payload 'OnUserDeletedConversationsTag) + +-- | All the notification endpoints return an 'EmptyResponse'. +type GalleyNotificationAPI = + NotificationAPI 'OnClientRemovedTag 'Galley + :<|> NotificationAPI 'OnMessageSentTag 'Galley + :<|> NotificationAPI 'OnMLSMessageSentTag 'Galley + :<|> NotificationAPI 'OnConversationUpdatedTag 'Galley + :<|> NotificationAPI 'OnUserDeletedConversationsTag 'Galley + +data ClientRemovedRequest = ClientRemovedRequest + { user :: UserId, + client :: ClientId, + convs :: [ConvId] + } + deriving stock (Eq, Show, Generic) + deriving (Arbitrary) via (GenericUniform ClientRemovedRequest) + deriving (FromJSON, ToJSON) via (CustomEncoded ClientRemovedRequest) + +-- Note: this is parametric in the conversation type to allow it to be used +-- both for conversations with a fixed known domain (e.g. as the argument of the +-- federation RPC), and for conversations with an arbitrary Qualified or Remote id +-- (e.g. as the argument of the corresponding handler). +data RemoteMessage conv = RemoteMessage + { time :: UTCTime, + _data :: Maybe Text, + sender :: Qualified UserId, + senderClient :: ClientId, + conversation :: conv, + priority :: Maybe Priority, + push :: Bool, + transient :: Bool, + recipients :: UserClientMap Text + } + deriving stock (Eq, Show, Generic, Functor) + deriving (Arbitrary) via (GenericUniform (RemoteMessage conv)) + deriving (ToJSON, FromJSON) via (CustomEncodedLensable (RemoteMessage conv)) + +data RemoteMLSMessage = RemoteMLSMessage + { time :: UTCTime, + metadata :: MessageMetadata, + sender :: Qualified UserId, + conversation :: ConvId, + subConversation :: Maybe SubConvId, + recipients :: Map UserId (NonEmpty ClientId), + message :: Base64ByteString + } + deriving stock (Eq, Show, Generic) + deriving (Arbitrary) via (GenericUniform RemoteMLSMessage) + deriving (ToJSON, FromJSON) via (CustomEncoded RemoteMLSMessage) + +data ConversationUpdate = ConversationUpdate + { cuTime :: UTCTime, + cuOrigUserId :: Qualified UserId, + -- | The unqualified ID of the conversation where the update is happening. + -- The ID is local to the sender to prevent putting arbitrary domain that + -- is different than that of the backend making a conversation membership + -- update request. + cuConvId :: ConvId, + -- | A list of users from the receiving backend that need to be sent + -- notifications about this change. This is required as we do not expect a + -- non-conversation owning backend to have an indexed mapping of + -- conversation to users. + cuAlreadyPresentUsers :: [UserId], + -- | Information on the specific action that caused the update. + cuAction :: SomeConversationAction + } + deriving (Eq, Show, Generic) + +instance ToJSON ConversationUpdate + +instance FromJSON ConversationUpdate + +type UserDeletedNotificationMaxConvs = 1000 + +data UserDeletedConversationsNotification = UserDeletedConversationsNotification + { -- | This is qualified implicitly by the origin domain + user :: UserId, + -- | These are qualified implicitly by the target domain + conversations :: Range 1 UserDeletedNotificationMaxConvs [ConvId] + } + deriving stock (Eq, Show, Generic) + deriving (Arbitrary) via (GenericUniform UserDeletedConversationsNotification) + deriving (FromJSON, ToJSON) via (CustomEncoded UserDeletedConversationsNotification) diff --git a/libs/wire-api-federation/src/Wire/API/Federation/BackendNotifications.hs b/libs/wire-api-federation/src/Wire/API/Federation/BackendNotifications.hs index 3fa1aba287..6ad8ddde89 100644 --- a/libs/wire-api-federation/src/Wire/API/Federation/BackendNotifications.hs +++ b/libs/wire-api-federation/src/Wire/API/Federation/BackendNotifications.hs @@ -6,22 +6,15 @@ module Wire.API.Federation.BackendNotifications where import Control.Exception import Control.Monad.Except import Data.Aeson -import Data.ByteString.Builder qualified as Builder -import Data.ByteString.Lazy qualified as LBS import Data.Domain import Data.Map qualified as Map -import Data.Sequence qualified as Seq import Data.Text qualified as Text -import Data.Text.Encoding import Data.Text.Lazy.Encoding qualified as TL import Imports import Network.AMQP qualified as Q import Network.AMQP.Types qualified as Q -import Network.HTTP.Types import Servant -import Servant.Client import Servant.Client.Core -import Servant.Types.SourceT import Wire.API.Federation.API.Common import Wire.API.Federation.Client import Wire.API.Federation.Component @@ -125,7 +118,7 @@ ensureQueue chan queue = do -- queue. Perhaps none of this should be servant code anymore. But it is here to -- allow smooth transition to RabbitMQ based notification pushing. -- --- Use 'Wire.API.Federation.API.fedQueueClient' to create and action and pass it +-- Use 'Wire.API.Federation.API.fedQueueClient' to create an action and pass it -- to 'enqueue' newtype FedQueueClient c a = FedQueueClient (ReaderT FedQueueEnv IO a) deriving (Functor, Applicative, Monad, MonadIO, MonadReader FedQueueEnv) @@ -141,42 +134,3 @@ data EnqueueError = EnqueueError String deriving (Show) instance Exception EnqueueError - -instance (KnownComponent c) => RunClient (FedQueueClient c) where - runRequestAcceptStatus :: Maybe [Status] -> Request -> FedQueueClient c Response - runRequestAcceptStatus _ req = do - env <- ask - bodyLBS <- case requestBody req of - Just (RequestBodyLBS lbs, _) -> pure lbs - Just (RequestBodyBS bs, _) -> pure (LBS.fromStrict bs) - Just (RequestBodySource src, _) -> liftIO $ do - errOrRes <- runExceptT $ runSourceT src - either (throwIO . EnqueueError) (pure . mconcat) errOrRes - Nothing -> pure mempty - let notif = - BackendNotification - { ownDomain = env.originDomain, - targetComponent = componentVal @c, - path = decodeUtf8 $ LBS.toStrict $ Builder.toLazyByteString req.requestPath, - body = RawJson bodyLBS - } - let msg = - Q.newMsg - { Q.msgBody = encode notif, - Q.msgDeliveryMode = Just (env.deliveryMode), - Q.msgContentType = Just "application/json" - } - -- Empty string means default exchange - exchange = "" - liftIO $ do - ensureQueue env.channel env.targetDomain._domainText - void $ Q.publishMsg env.channel exchange (routingKey env.targetDomain._domainText) msg - pure $ - Response - { responseHttpVersion = http20, - responseStatusCode = status200, - responseHeaders = Seq.singleton (hContentType, "application/json"), - responseBody = "{}" - } - throwClientError :: ClientError -> FedQueueClient c a - throwClientError = liftIO . throwIO diff --git a/libs/wire-api-federation/src/Wire/API/Federation/Endpoint.hs b/libs/wire-api-federation/src/Wire/API/Federation/Endpoint.hs index 509e73aa61..664835848f 100644 --- a/libs/wire-api-federation/src/Wire/API/Federation/Endpoint.hs +++ b/libs/wire-api-federation/src/Wire/API/Federation/Endpoint.hs @@ -24,7 +24,9 @@ where import Data.Kind import Servant.API import Wire.API.ApplyMods +import Wire.API.Federation.API.Common import Wire.API.Federation.Domain +import Wire.API.Federation.HasNotificationEndpoint import Wire.API.Routes.Named type FedEndpointWithMods (mods :: [Type]) name input output = @@ -35,8 +37,14 @@ type FedEndpointWithMods (mods :: [Type]) name input output = (name :> OriginDomainHeader :> ReqBody '[JSON] input :> Post '[JSON] output) ) +type NotificationFedEndpointWithMods (mods :: [Type]) name input = + FedEndpointWithMods mods name input EmptyResponse + type FedEndpoint name input output = FedEndpointWithMods '[] name input output +type NotificationFedEndpoint tag = + FedEndpoint (NotificationPath tag) (Payload tag) EmptyResponse + type StreamingFedEndpoint name input output = Named name diff --git a/libs/wire-api-federation/src/Wire/API/Federation/HasNotificationEndpoint.hs b/libs/wire-api-federation/src/Wire/API/Federation/HasNotificationEndpoint.hs new file mode 100644 index 0000000000..d9d147b6fc --- /dev/null +++ b/libs/wire-api-federation/src/Wire/API/Federation/HasNotificationEndpoint.hs @@ -0,0 +1,67 @@ +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2023 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Wire.API.Federation.HasNotificationEndpoint where + +import Data.Aeson +import Data.Domain +import Data.Kind +import Data.Proxy +import Data.Text qualified as T +import GHC.TypeLits +import Imports +import Wire.API.Federation.BackendNotifications +import Wire.API.Federation.Component +import Wire.API.RawJson + +class HasNotificationEndpoint t where + -- | The type of the payload for this endpoint + type Payload t :: Type + + -- | The central path component of a notification endpoint, e.g., + -- "on-conversation-updated". + type NotificationPath t :: Symbol + + -- | The server component this endpoint is associated with + type NotificationComponent t :: Component + + -- | The Servant API endpoint type + type NotificationAPI t (c :: Component) :: Type + +-- | Convert a federation endpoint to a backend notification to be enqueued to a +-- RabbitMQ queue. +fedNotifToBackendNotif :: + forall tag. + KnownSymbol (NotificationPath tag) => + KnownComponent (NotificationComponent tag) => + ToJSON (Payload tag) => + Domain -> + Payload tag -> + BackendNotification +fedNotifToBackendNotif ownDomain payload = + let p = T.pack . symbolVal $ Proxy @(NotificationPath tag) + b = RawJson . encode $ payload + in toNotif p b + where + toNotif :: Text -> RawJson -> BackendNotification + toNotif path body = + BackendNotification + { ownDomain = ownDomain, + targetComponent = componentVal @(NotificationComponent tag), + path = path, + body = body + } diff --git a/libs/wire-api-federation/wire-api-federation.cabal b/libs/wire-api-federation/wire-api-federation.cabal index 6efd7ef2eb..3d46abff3d 100644 --- a/libs/wire-api-federation/wire-api-federation.cabal +++ b/libs/wire-api-federation/wire-api-federation.cabal @@ -18,15 +18,18 @@ library exposed-modules: Wire.API.Federation.API Wire.API.Federation.API.Brig + Wire.API.Federation.API.Brig.Notifications Wire.API.Federation.API.Cargohold Wire.API.Federation.API.Common Wire.API.Federation.API.Galley + Wire.API.Federation.API.Galley.Notifications Wire.API.Federation.BackendNotifications Wire.API.Federation.Client Wire.API.Federation.Component Wire.API.Federation.Domain Wire.API.Federation.Endpoint Wire.API.Federation.Error + Wire.API.Federation.HasNotificationEndpoint Wire.API.Federation.Version other-modules: Paths_wire_api_federation diff --git a/services/brig/src/Brig/API/Federation.hs b/services/brig/src/Brig/API/Federation.hs index f7bdf0f387..90ddd22a28 100644 --- a/services/brig/src/Brig/API/Federation.hs +++ b/services/brig/src/Brig/API/Federation.hs @@ -90,9 +90,9 @@ federationSitemap = :<|> Named @"get-user-clients" getUserClients :<|> Named @"get-mls-clients" getMLSClients :<|> Named @"send-connection-action" sendConnectionAction - :<|> Named @"on-user-deleted-connections" onUserDeleted :<|> Named @"claim-key-packages" fedClaimKeyPackages :<|> Named @"get-not-fully-connected-backends" getFederationStatus + :<|> Named @"on-user-deleted-connections" onUserDeleted -- Allow remote domains to send their known remote federation instances, and respond -- with the subset of those we aren't connected to. diff --git a/services/brig/src/Brig/Federation/Client.hs b/services/brig/src/Brig/Federation/Client.hs index f0068f6432..87c44ec446 100644 --- a/services/brig/src/Brig/Federation/Client.hs +++ b/services/brig/src/Brig/Federation/Client.hs @@ -152,7 +152,7 @@ notifyUserDeleted self remotes = do Just chanVar -> do enqueueNotification (tDomain self) remoteDomain Q.Persistent chanVar $ void $ - fedQueueClient @'Brig @"on-user-deleted-connections" notif + fedQueueClient @'OnUserDeletedConnectionsTag notif Nothing -> Log.err $ Log.msg ("Federation error while notifying remote backends of a user deletion." :: ByteString) diff --git a/services/galley/src/Galley/API/Action.hs b/services/galley/src/Galley/API/Action.hs index e699407568..d8d5c530cd 100644 --- a/services/galley/src/Galley/API/Action.hs +++ b/services/galley/src/Galley/API/Action.hs @@ -883,7 +883,7 @@ notifyConversationAction tag quid notifyOrigDomain con lconv targets action = do -- because quid's backend will update local state and notify its users -- itself using the ConversationUpdate returned by this function if notifyOrigDomain || tDomain ruids /= qDomain quid - then fedQueueClient @'Galley @"on-conversation-updated" update $> Nothing + then fedQueueClient @'OnConversationUpdatedTag update $> Nothing else pure (Just update) -- notify local participants and bots diff --git a/services/galley/src/Galley/API/Clients.hs b/services/galley/src/Galley/API/Clients.hs index b264724a04..044447c488 100644 --- a/services/galley/src/Galley/API/Clients.hs +++ b/services/galley/src/Galley/API/Clients.hs @@ -137,5 +137,5 @@ rmClientH (usr ::: cid) = do removeRemoteMLSClients :: Range 1 1000 [Remote ConvId] -> Sem r () removeRemoteMLSClients convIds = do for_ (bucketRemote (fromRange convIds)) $ \remoteConvs -> - let rpc = void $ fedQueueClient @'Galley @"on-client-removed" (ClientRemovedRequest usr cid (tUnqualified remoteConvs)) + let rpc = void $ fedQueueClient @'OnClientRemovedTag (ClientRemovedRequest usr cid (tUnqualified remoteConvs)) in enqueueNotification remoteConvs Q.Persistent rpc diff --git a/services/galley/src/Galley/API/Federation.hs b/services/galley/src/Galley/API/Federation.hs index d7f2e3539a..3a22a033b2 100644 --- a/services/galley/src/Galley/API/Federation.hs +++ b/services/galley/src/Galley/API/Federation.hs @@ -103,24 +103,24 @@ federationSitemap :: federationSitemap = Named @"on-conversation-created" onConversationCreated :<|> Named @"get-conversations" getConversations - :<|> Named @"on-conversation-updated" onConversationUpdated :<|> Named @"leave-conversation" (callsFed (exposeAnnotations leaveConversation)) - :<|> Named @"on-message-sent" onMessageSent :<|> Named @"send-message" (callsFed (exposeAnnotations sendMessage)) - :<|> Named @"on-user-deleted-conversations" (callsFed (exposeAnnotations onUserDeleted)) :<|> Named @"update-conversation" (callsFed (exposeAnnotations updateConversation)) :<|> Named @"mls-welcome" mlsSendWelcome - :<|> Named @"on-mls-message-sent" onMLSMessageSent :<|> Named @"send-mls-message" (callsFed (exposeAnnotations sendMLSMessage)) :<|> Named @"send-mls-commit-bundle" (callsFed (exposeAnnotations sendMLSCommitBundle)) :<|> Named @"query-group-info" queryGroupInfo - :<|> Named @"on-client-removed" (callsFed (exposeAnnotations onClientRemoved)) :<|> Named @"update-typing-indicator" (callsFed (exposeAnnotations updateTypingIndicator)) :<|> Named @"on-typing-indicator-updated" onTypingIndicatorUpdated :<|> Named @"get-sub-conversation" getSubConversationForRemoteUser :<|> Named @"delete-sub-conversation" (callsFed deleteSubConversationForRemoteUser) :<|> Named @"leave-sub-conversation" (callsFed leaveSubConversation) :<|> Named @"get-one2one-conversation" getOne2OneConversation + :<|> Named @"on-client-removed" (callsFed (exposeAnnotations onClientRemoved)) + :<|> Named @"on-message-sent" onMessageSent + :<|> Named @"on-mls-message-sent" onMLSMessageSent + :<|> Named @"on-conversation-updated" onConversationUpdated + :<|> Named @"on-user-deleted-conversations" (callsFed (exposeAnnotations onUserDeleted)) onClientRemoved :: ( Member BackendNotificationQueueAccess r, diff --git a/services/galley/src/Galley/API/Internal.hs b/services/galley/src/Galley/API/Internal.hs index 2830fc16d4..b33bab98ac 100644 --- a/services/galley/src/Galley/API/Internal.hs +++ b/services/galley/src/Galley/API/Internal.hs @@ -420,7 +420,7 @@ rmUser lusr conn = do leaveRemoteConversations cids = for_ (bucketRemote (fromRange cids)) $ \remoteConvs -> do let userDelete = UserDeletedConversationsNotification (tUnqualified lusr) (unsafeRange (tUnqualified remoteConvs)) - let rpc = void $ fedQueueClient @'Galley @"on-user-deleted-conversations" userDelete + let rpc = void $ fedQueueClient @'OnUserDeletedConversationsTag userDelete enqueueNotification remoteConvs Q.Persistent rpc -- FUTUREWORK: Add a retry mechanism if there are federation errrors. diff --git a/services/galley/src/Galley/API/MLS/Propagate.hs b/services/galley/src/Galley/API/MLS/Propagate.hs index e9f6ac089d..6b17a3a8a6 100644 --- a/services/galley/src/Galley/API/MLS/Propagate.hs +++ b/services/galley/src/Galley/API/MLS/Propagate.hs @@ -89,7 +89,7 @@ propagateMessage qusr mSenderClient lConvOrSub con msg cm = do -- send to remotes (either (logRemoteNotificationError @"on-mls-message-sent") (const (pure ())) <=< enqueueNotificationsConcurrently Q.Persistent (map remoteMemberQualify rmems)) $ \rs -> - fedQueueClient @'Galley @"on-mls-message-sent" $ + fedQueueClient @'OnMLSMessageSentTag $ RemoteMLSMessage { time = now, sender = qusr, diff --git a/services/galley/src/Galley/API/Message.hs b/services/galley/src/Galley/API/Message.hs index a78166d3e3..66657736a6 100644 --- a/services/galley/src/Galley/API/Message.hs +++ b/services/galley/src/Galley/API/Message.hs @@ -663,7 +663,7 @@ sendRemoteMessages domain now sender senderClient lcnv metadata messages = (hand transient = mmTransient metadata, recipients = UserClientMap rcpts } - let rpc = void $ fedQueueClient @'Galley @"on-message-sent" rm + let rpc = void $ fedQueueClient @'OnMessageSentTag rm enqueueNotification domain Q.Persistent rpc where handle :: Either FederationError a -> Sem r (Set (UserId, ClientId))