diff --git a/changelog.d/6-federation/on-conversation-updated-async b/changelog.d/6-federation/on-conversation-updated-async new file mode 100644 index 00000000000..24885094cf6 --- /dev/null +++ b/changelog.d/6-federation/on-conversation-updated-async @@ -0,0 +1,3 @@ +The on-conversation-updated notification is now queued instead of being sent directly. A new version of the notification has been introduced with a different JSON format for the body, mostly for testing purposes of the versioning system. + +Since the notification is now sent asynchronously, some error conditions in case of unreachable backends cannot be triggered anymore. diff --git a/changelog.d/6-federation/wpb-183-versioned-async-b2b b/changelog.d/6-federation/wpb-183-versioned-async-b2b new file mode 100644 index 00000000000..c33245c5ccd --- /dev/null +++ b/changelog.d/6-federation/wpb-183-versioned-async-b2b @@ -0,0 +1,2 @@ +Versioning of backend to backend notifications. Notifications are now stored in "bundles" containing a serialised payload for each supported version. The background worker then dynamically selects the best version to use and sends only the notification corresponding to that version. + diff --git a/deploy/dockerephemeral/federation-v0.yaml b/deploy/dockerephemeral/federation-v0.yaml index 1342056cac5..8ed1179b048 100644 --- a/deploy/dockerephemeral/federation-v0.yaml +++ b/deploy/dockerephemeral/federation-v0.yaml @@ -182,6 +182,8 @@ services: networks: - demo_wire - coredns + extra_hosts: + - "host.docker.internal.:host-gateway" ports: - '127.0.0.1:21097:8080' - '127.0.0.1:21098:8081' diff --git a/integration/test/Test/Conversation.hs b/integration/test/Test/Conversation.hs index 9ff4641bbcc..1d5587f40ae 100644 --- a/integration/test/Test/Conversation.hs +++ b/integration/test/Test/Conversation.hs @@ -850,3 +850,17 @@ testGuestLinksExpired = do liftIO $ threadDelay (1_100_000) bindResponse (getJoinCodeConv tm k v) $ \resp -> do resp.status `shouldMatchInt` 404 + +testConversationWithFedV0 :: HasCallStack => App () +testConversationWithFedV0 = do + alice <- randomUser OwnDomain def + bob <- randomUser FedV0Domain def + withAPIVersion 4 $ connectTwoUsers alice bob + + conv <- + postConversation alice (defProteus {qualifiedUsers = [bob]}) + >>= getJSON 201 + + withWebSocket bob $ \ws -> do + void $ changeConversationName alice conv "foobar" >>= getJSON 200 + void $ awaitMatch isConvNameChangeNotif ws diff --git a/integration/test/Testlib/Env.hs b/integration/test/Testlib/Env.hs index f85f1d0934a..4becf8eb9a3 100644 --- a/integration/test/Testlib/Env.hs +++ b/integration/test/Testlib/Env.hs @@ -4,6 +4,7 @@ module Testlib.Env where import Control.Monad.Codensity import Control.Monad.IO.Class +import Control.Monad.Reader import Data.Default import Data.Function ((&)) import Data.Functor @@ -184,3 +185,6 @@ mkMLSState = Codensity $ \k -> ciphersuite = def, protocol = MLSProtocolMLS } + +withAPIVersion :: Int -> App a -> App a +withAPIVersion v = local $ \e -> e {defaultAPIVersion = v} diff --git a/libs/wire-api-federation/src/Wire/API/Federation/API.hs b/libs/wire-api-federation/src/Wire/API/Federation/API.hs index 242991d2c41..053275577e3 100644 --- a/libs/wire-api-federation/src/Wire/API/Federation/API.hs +++ b/libs/wire-api-federation/src/Wire/API/Federation/API.hs @@ -24,12 +24,14 @@ module Wire.API.Federation.API HasUnsafeFedEndpoint, fedClient, fedQueueClient, + sendBundle, fedClientIn, unsafeFedClientIn, module Wire.API.MakesFederatedCall, -- * Re-exports Component (..), + makeConversationUpdateBundle, ) where @@ -45,6 +47,7 @@ import Servant.Client.Core import Wire.API.Federation.API.Brig import Wire.API.Federation.API.Cargohold import Wire.API.Federation.API.Galley +import Wire.API.Federation.API.Util import Wire.API.Federation.BackendNotifications import Wire.API.Federation.Client import Wire.API.Federation.Component @@ -88,21 +91,21 @@ fedClient :: Client m api fedClient = clientIn (Proxy @api) (Proxy @m) -fedQueueClient :: - forall {k} (tag :: k). - ( HasNotificationEndpoint tag, - KnownSymbol (NotificationPath tag), - KnownComponent (NotificationComponent k), - ToJSON (Payload tag) - ) => - Payload tag -> - FedQueueClient (NotificationComponent k) () -fedQueueClient payload = do +fedClientIn :: + forall (comp :: Component) (name :: Symbol) m api. + (HasFedEndpoint comp api name, HasClient m api) => + Client m api +fedClientIn = clientIn (Proxy @api) (Proxy @m) + +sendBundle :: + KnownComponent c => + PayloadBundle c -> + FedQueueClient c () +sendBundle bundle = do env <- ask - let notif = fedNotifToBackendNotif @tag env.requestId env.originDomain payload - msg = + let msg = newMsg - { msgBody = encode notif, + { msgBody = encode bundle, msgDeliveryMode = Just (env.deliveryMode), msgContentType = Just "application/json" } @@ -112,11 +115,18 @@ fedQueueClient payload = do ensureQueue env.channel env.targetDomain._domainText void $ publishMsg env.channel exchange (routingKey env.targetDomain._domainText) msg -fedClientIn :: - forall (comp :: Component) (name :: Symbol) m api. - (HasFedEndpoint comp api name, HasClient m api) => - Client m api -fedClientIn = clientIn (Proxy @api) (Proxy @m) +fedQueueClient :: + forall {k} (tag :: k) c. + ( HasNotificationEndpoint tag, + HasVersionRange tag, + HasFedPath tag, + KnownComponent (NotificationComponent k), + ToJSON (Payload tag), + c ~ NotificationComponent k + ) => + Payload tag -> + FedQueueClient c () +fedQueueClient payload = sendBundle =<< makeBundle @tag payload -- | Like 'fedClientIn', but doesn't propagate a 'CallsFed' constraint. Intended -- to be used in test situations only. diff --git a/libs/wire-api-federation/src/Wire/API/Federation/API/Galley/Notifications.hs b/libs/wire-api-federation/src/Wire/API/Federation/API/Galley/Notifications.hs index 9f9e1ee589e..0318e84d666 100644 --- a/libs/wire-api-federation/src/Wire/API/Federation/API/Galley/Notifications.hs +++ b/libs/wire-api-federation/src/Wire/API/Federation/API/Galley/Notifications.hs @@ -34,8 +34,10 @@ import Wire.API.Conversation.Action import Wire.API.Federation.Component import Wire.API.Federation.Endpoint import Wire.API.Federation.HasNotificationEndpoint +import Wire.API.Federation.Version import Wire.API.MLS.SubConversation import Wire.API.Message +import Wire.API.Routes.Version (From, Until) import Wire.API.Util.Aeson import Wire.Arbitrary @@ -43,6 +45,7 @@ data GalleyNotificationTag = OnClientRemovedTag | OnMessageSentTag | OnMLSMessageSentTag + | OnConversationUpdatedTagV0 | OnConversationUpdatedTag | OnUserDeletedConversationsTag deriving (Show, Eq, Generic, Bounded, Enum) @@ -66,9 +69,16 @@ instance HasNotificationEndpoint 'OnMLSMessageSentTag where -- used by the backend that owns a conversation to inform this backend of -- changes to the conversation +instance HasNotificationEndpoint 'OnConversationUpdatedTagV0 where + type Payload 'OnConversationUpdatedTagV0 = ConversationUpdateV0 + type NotificationPath 'OnConversationUpdatedTagV0 = "on-conversation-updated" + type NotificationVersionTag 'OnConversationUpdatedTagV0 = 'Just 'V0 + type NotificationMods 'OnConversationUpdatedTagV0 = '[Until 'V1] + instance HasNotificationEndpoint 'OnConversationUpdatedTag where type Payload 'OnConversationUpdatedTag = ConversationUpdate type NotificationPath 'OnConversationUpdatedTag = "on-conversation-updated" + type NotificationMods 'OnConversationUpdatedTag = '[From 'V1] instance HasNotificationEndpoint 'OnUserDeletedConversationsTag where type Payload 'OnUserDeletedConversationsTag = UserDeletedConversationsNotification @@ -79,6 +89,7 @@ type GalleyNotificationAPI = NotificationFedEndpoint 'OnClientRemovedTag :<|> NotificationFedEndpoint 'OnMessageSentTag :<|> NotificationFedEndpoint 'OnMLSMessageSentTag + :<|> NotificationFedEndpoint 'OnConversationUpdatedTagV0 :<|> NotificationFedEndpoint 'OnConversationUpdatedTag :<|> NotificationFedEndpoint 'OnUserDeletedConversationsTag @@ -129,7 +140,7 @@ data RemoteMLSMessage = RemoteMLSMessage instance ToSchema RemoteMLSMessage -data ConversationUpdate = ConversationUpdate +data ConversationUpdateV0 = ConversationUpdateV0 { cuTime :: UTCTime, cuOrigUserId :: Qualified UserId, -- | The unqualified ID of the conversation where the update is happening. @@ -147,12 +158,56 @@ data ConversationUpdate = ConversationUpdate } deriving (Eq, Show, Generic) +instance ToJSON ConversationUpdateV0 + +instance FromJSON ConversationUpdateV0 + +instance ToSchema ConversationUpdateV0 + +data ConversationUpdate = ConversationUpdate + { time :: UTCTime, + origUserId :: Qualified UserId, + -- | The unqualified ID of the conversation where the update is happening. + -- The ID is local to the sender to prevent putting arbitrary domain that + -- is different than that of the backend making a conversation membership + -- update request. + convId :: ConvId, + -- | A list of users from the receiving backend that need to be sent + -- notifications about this change. This is required as we do not expect a + -- non-conversation owning backend to have an indexed mapping of + -- conversation to users. + alreadyPresentUsers :: [UserId], + -- | Information on the specific action that caused the update. + action :: SomeConversationAction + } + deriving (Eq, Show, Generic) + instance ToJSON ConversationUpdate instance FromJSON ConversationUpdate instance ToSchema ConversationUpdate +conversationUpdateToV0 :: ConversationUpdate -> ConversationUpdateV0 +conversationUpdateToV0 cu = + ConversationUpdateV0 + { cuTime = cu.time, + cuOrigUserId = cu.origUserId, + cuConvId = cu.convId, + cuAlreadyPresentUsers = cu.alreadyPresentUsers, + cuAction = cu.action + } + +conversationUpdateFromV0 :: ConversationUpdateV0 -> ConversationUpdate +conversationUpdateFromV0 cu = + ConversationUpdate + { time = cu.cuTime, + origUserId = cu.cuOrigUserId, + convId = cu.cuConvId, + alreadyPresentUsers = cu.cuAlreadyPresentUsers, + action = cu.cuAction + } + type UserDeletedNotificationMaxConvs = 1000 data UserDeletedConversationsNotification = UserDeletedConversationsNotification diff --git a/libs/wire-api-federation/src/Wire/API/Federation/API/Util.hs b/libs/wire-api-federation/src/Wire/API/Federation/API/Util.hs new file mode 100644 index 00000000000..d855c2abb01 --- /dev/null +++ b/libs/wire-api-federation/src/Wire/API/Federation/API/Util.hs @@ -0,0 +1,29 @@ +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2023 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Wire.API.Federation.API.Util where + +import Imports +import Wire.API.Federation.API.Galley.Notifications +import Wire.API.Federation.BackendNotifications +import Wire.API.Federation.Component + +makeConversationUpdateBundle :: + ConversationUpdate -> + FedQueueClient 'Galley (PayloadBundle 'Galley) +makeConversationUpdateBundle update = + (<>) <$> makeBundle update <*> makeBundle (conversationUpdateToV0 update) diff --git a/libs/wire-api-federation/src/Wire/API/Federation/BackendNotifications.hs b/libs/wire-api-federation/src/Wire/API/Federation/BackendNotifications.hs index b3cb2546ab4..43849c716da 100644 --- a/libs/wire-api-federation/src/Wire/API/Federation/BackendNotifications.hs +++ b/libs/wire-api-federation/src/Wire/API/Federation/BackendNotifications.hs @@ -4,11 +4,14 @@ module Wire.API.Federation.BackendNotifications where import Control.Exception +import Control.Monad.Codensity import Control.Monad.Except -import Data.Aeson +import Data.Aeson qualified as A import Data.Domain import Data.Id (RequestId) +import Data.List.NonEmpty qualified as NE import Data.Map qualified as Map +import Data.Schema import Data.Text qualified as Text import Data.Text.Lazy.Encoding qualified as TL import Imports @@ -20,6 +23,8 @@ import Wire.API.Federation.API.Common import Wire.API.Federation.Client import Wire.API.Federation.Component import Wire.API.Federation.Error +import Wire.API.Federation.HasNotificationEndpoint +import Wire.API.Federation.Version import Wire.API.RawJson -- | NOTE: Stored in RabbitMQ, any changes to serialization of this object could cause @@ -33,46 +38,115 @@ data BackendNotification = BackendNotification -- pusher. This also makes development less clunky as we don't have to -- create a sum type here for all types of notifications that could exist. body :: RawJson, + -- | The federation API versions that the 'body' corresponds to. The field + -- is optional so that messages already in the queue are not lost. + bodyVersions :: Maybe VersionRange, requestId :: Maybe RequestId } deriving (Show, Eq) - -instance ToJSON BackendNotification where - toJSON notif = - object - [ "ownDomain" .= notif.ownDomain, - "targetComponent" .= notif.targetComponent, - "path" .= notif.path, - "body" .= TL.decodeUtf8 notif.body.rawJsonBytes, - "requestId" .= notif.requestId - ] - -instance FromJSON BackendNotification where - parseJSON = withObject "BackendNotification" $ \o -> - BackendNotification - <$> o .: "ownDomain" - <*> o .: "targetComponent" - <*> o .: "path" - <*> (RawJson . TL.encodeUtf8 <$> o .: "body") - <*> o .:? "requestId" + deriving (A.ToJSON, A.FromJSON) via (Schema BackendNotification) + +instance ToSchema BackendNotification where + schema = + object "BackendNotification" $ + BackendNotification + <$> ownDomain .= field "ownDomain" schema + <*> targetComponent .= field "targetComponent" schema + <*> path .= field "path" schema + <*> (TL.decodeUtf8 . rawJsonBytes . body) + .= field "body" (RawJson . TL.encodeUtf8 <$> schema) + <*> bodyVersions .= maybe_ (optField "bodyVersions" schema) + <*> (.requestId) .= maybe_ (optField "requestId" schema) + +-- | Convert a federation endpoint to a backend notification to be enqueued to a +-- RabbitMQ queue. +fedNotifToBackendNotif :: + forall {k} (tag :: k). + ( HasFedPath tag, + HasVersionRange tag, + KnownComponent (NotificationComponent k), + A.ToJSON (Payload tag) + ) => + RequestId -> + Domain -> + Payload tag -> + BackendNotification +fedNotifToBackendNotif rid ownDomain payload = + let p = Text.pack $ fedPath @tag + b = RawJson . A.encode $ payload + in toNotif p b + where + toNotif :: Text -> RawJson -> BackendNotification + toNotif path body = + BackendNotification + { ownDomain = ownDomain, + targetComponent = componentVal @(NotificationComponent k), + path = path, + body = body, + bodyVersions = Just $ versionRange @tag, + requestId = Just rid + } + +newtype PayloadBundle (c :: Component) = PayloadBundle + { notifications :: NE.NonEmpty BackendNotification + } + deriving (A.ToJSON, A.FromJSON) via (Schema (PayloadBundle c)) + deriving newtype (Semigroup) + +instance ToSchema (PayloadBundle c) where + schema = + object "PayloadBundle" $ + PayloadBundle + <$> notifications .= field "notifications" (nonEmptyArray schema) + +toBundle :: + forall {k} (tag :: k). + ( HasFedPath tag, + HasVersionRange tag, + KnownComponent (NotificationComponent k), + A.ToJSON (Payload tag) + ) => + RequestId -> + -- | The origin domain + Domain -> + Payload tag -> + PayloadBundle (NotificationComponent k) +toBundle reqId originDomain payload = + let notif = fedNotifToBackendNotif @tag reqId originDomain payload + in PayloadBundle . pure $ notif + +makeBundle :: + forall {k} (tag :: k) c. + ( HasFedPath tag, + HasVersionRange tag, + KnownComponent (NotificationComponent k), + A.ToJSON (Payload tag), + c ~ NotificationComponent k + ) => + Payload tag -> + FedQueueClient c (PayloadBundle c) +makeBundle payload = do + reqId <- asks (.requestId) + origin <- asks (.originDomain) + pure $ toBundle @tag reqId origin payload type BackendNotificationAPI = Capture "name" Text :> ReqBody '[JSON] RawJson :> Post '[JSON] EmptyResponse -sendNotification :: FederatorClientEnv -> Component -> Text -> RawJson -> IO (Either FederatorClientError ()) -sendNotification env component path body = - case component of - Brig -> go @'Brig - Galley -> go @'Galley - Cargohold -> go @'Cargohold +sendNotification :: FederatorClientVersionedEnv -> Component -> Text -> RawJson -> IO (Either FederatorClientError ()) +sendNotification env component path body = case someComponent component of + SomeComponent p -> go p where withoutFirstSlash :: Text -> Text withoutFirstSlash (Text.stripPrefix "/" -> Just t) = t withoutFirstSlash t = t - go :: forall c. (KnownComponent c) => IO (Either FederatorClientError ()) - go = - runFederatorClient env . void $ - clientIn (Proxy @BackendNotificationAPI) (Proxy @(FederatorClient c)) (withoutFirstSlash path) body + go :: forall c. KnownComponent c => Proxy c -> IO (Either FederatorClientError ()) + go _ = + lowerCodensity + . runExceptT + . runVersionedFederatorClientToCodensity env + . void + $ clientIn (Proxy @BackendNotificationAPI) (Proxy @(FederatorClient c)) (withoutFirstSlash path) body enqueue :: Q.Channel -> RequestId -> Domain -> Domain -> Q.DeliveryMode -> FedQueueClient c a -> IO a enqueue channel requestId originDomain targetDomain deliveryMode (FedQueueClient action) = diff --git a/libs/wire-api-federation/src/Wire/API/Federation/Client.hs b/libs/wire-api-federation/src/Wire/API/Federation/Client.hs index 648a4ee3ec6..37444a6a49e 100644 --- a/libs/wire-api-federation/src/Wire/API/Federation/Client.hs +++ b/libs/wire-api-federation/src/Wire/API/Federation/Client.hs @@ -21,8 +21,10 @@ module Wire.API.Federation.Client ( FederatorClientEnv (..), FederatorClientVersionedEnv (..), + unversionedEnv, FederatorClient, runFederatorClient, + runVersionedFederatorClient, runFederatorClientToCodensity, runVersionedFederatorClientToCodensity, performHTTP2Request, @@ -85,6 +87,9 @@ data FederatorClientVersionedEnv = FederatorClientVersionedEnv cveVersion :: Maybe Version } +unversionedEnv :: FederatorClientEnv -> FederatorClientVersionedEnv +unversionedEnv env = FederatorClientVersionedEnv env Nothing + -- | A request to a remote backend. The API version of the remote backend is in -- the environment. The 'MaybeT' layer is used to match endpoint versions (via -- the 'Alternative' and 'VersionedMonad' instances). @@ -171,7 +176,16 @@ instance KnownComponent c => RunClient (FederatorClient c) where expectedStatuses v <- asks cveVersion - let vreq = req {requestHeaders = (versionHeader, toByteString' (versionInt (fromMaybe V0 v))) :<| requestHeaders req} + let vreq = + req + { requestHeaders = + ( versionHeader, + toByteString' + ( versionInt (fromMaybe V0 v) + ) + ) + :<| requestHeaders req + } withHTTP2StreamingRequest successfulStatus vreq $ \resp -> do bdy <- @@ -297,6 +311,15 @@ runFederatorClient env = lowerCodensity . runFederatorClientToCodensity env +runVersionedFederatorClient :: + FederatorClientVersionedEnv -> + FederatorClient c a -> + IO (Either FederatorClientError a) +runVersionedFederatorClient venv = + lowerCodensity + . runExceptT + . runVersionedFederatorClientToCodensity venv + runFederatorClientToCodensity :: forall c a. FederatorClientEnv -> @@ -306,7 +329,7 @@ runFederatorClientToCodensity env action = runExceptT $ do v <- runVersionedFederatorClientToCodensity (FederatorClientVersionedEnv env Nothing) - versionNegotiation + (versionNegotiation supportedVersions) runVersionedFederatorClientToCodensity @c (FederatorClientVersionedEnv env (Just v)) action @@ -323,8 +346,8 @@ runVersionedFederatorClientToCodensity env = where unmaybe = (maybe (E.throw FederatorClientVersionMismatch) pure =<<) -versionNegotiation :: FederatorClient 'Brig Version -versionNegotiation = +versionNegotiation :: Set Version -> FederatorClient 'Brig Version +versionNegotiation localVersions = let req = defaultRequest { requestPath = "/api-version", @@ -334,13 +357,15 @@ versionNegotiation = } in withHTTP2StreamingRequest @'Brig HTTP.statusIsSuccessful req $ \resp -> do body <- toLazyByteString <$> streamingResponseStrictBody resp - remoteVersions <- case Aeson.decode body of + allRemoteVersions <- case Aeson.decode body of Nothing -> E.throw (FederatorClientVersionNegotiationError InvalidVersionInfo) - Just info -> pure (Set.fromList (vinfoSupported info)) - case Set.lookupMax (Set.intersection remoteVersions supportedVersions) of + Just info -> pure (vinfoSupported info) + -- ignore versions that don't even exist locally + let remoteVersions = Set.fromList $ Imports.mapMaybe intToVersion allRemoteVersions + case Set.lookupMax (Set.intersection remoteVersions localVersions) of Just v -> pure v Nothing -> E.throw . FederatorClientVersionNegotiationError $ - if Set.lookupMax supportedVersions > Set.lookupMax remoteVersions + if Set.lookupMax localVersions > Set.lookupMax remoteVersions then RemoteTooOld else RemoteTooNew diff --git a/libs/wire-api-federation/src/Wire/API/Federation/Component.hs b/libs/wire-api-federation/src/Wire/API/Federation/Component.hs index 73595904f7c..1a5b91e6bd3 100644 --- a/libs/wire-api-federation/src/Wire/API/Federation/Component.hs +++ b/libs/wire-api-federation/src/Wire/API/Federation/Component.hs @@ -21,6 +21,7 @@ module Wire.API.Federation.Component ) where +import Data.Proxy import Imports import Wire.API.MakesFederatedCall (Component (..)) @@ -46,3 +47,11 @@ instance KnownComponent 'Galley where instance KnownComponent 'Cargohold where componentVal = Cargohold + +data SomeComponent where + SomeComponent :: KnownComponent c => Proxy c -> SomeComponent + +someComponent :: Component -> SomeComponent +someComponent Brig = SomeComponent (Proxy @'Brig) +someComponent Galley = SomeComponent (Proxy @'Galley) +someComponent Cargohold = SomeComponent (Proxy @'Cargohold) diff --git a/libs/wire-api-federation/src/Wire/API/Federation/Endpoint.hs b/libs/wire-api-federation/src/Wire/API/Federation/Endpoint.hs index e656a3eda2f..f24085139cb 100644 --- a/libs/wire-api-federation/src/Wire/API/Federation/Endpoint.hs +++ b/libs/wire-api-federation/src/Wire/API/Federation/Endpoint.hs @@ -23,6 +23,7 @@ where import Data.Kind import GHC.TypeLits +import Imports import Servant.API import Wire.API.ApplyMods import Wire.API.Federation.API.Common @@ -41,21 +42,29 @@ type instance FedPath (name :: Symbol) = name type instance FedPath (Versioned v name) = name +type UnnamedFedEndpointWithMods (mods :: [Type]) path input output = + ( ApplyMods + mods + (path :> OriginDomainHeader :> ReqBody '[JSON] input :> Post '[JSON] output) + ) + type FedEndpointWithMods (mods :: [Type]) name input output = Named name - ( ApplyMods - mods - (FedPath name :> OriginDomainHeader :> ReqBody '[JSON] input :> Post '[JSON] output) + ( UnnamedFedEndpointWithMods mods (FedPath name) input output ) -type NotificationFedEndpointWithMods (mods :: [Type]) name input = - FedEndpointWithMods mods name input EmptyResponse - type FedEndpoint name input output = FedEndpointWithMods '[] name input output +type NotificationFedEndpointWithMods (mods :: [Type]) name path input = + Named name (UnnamedFedEndpointWithMods mods path input EmptyResponse) + type NotificationFedEndpoint tag = - FedEndpoint (NotificationPath tag) (Payload tag) EmptyResponse + MkNotificationFedEndpoint + (NotificationMods tag) + (NotificationPath tag) + (NotificationVersionTag tag) + (Payload tag) type StreamingFedEndpoint name input output = Named @@ -65,3 +74,18 @@ type StreamingFedEndpoint name input output = :> ReqBody '[JSON] input :> StreamPost NoFraming OctetStream output ) + +type family + MkNotificationFedEndpoint + (m :: [Type]) + (s :: Symbol) + (v :: Maybe k) + (p :: Type) + +type instance + MkNotificationFedEndpoint m s 'Nothing p = + NotificationFedEndpointWithMods m s s p + +type instance + MkNotificationFedEndpoint m s ('Just v) p = + NotificationFedEndpointWithMods m (Versioned v s) s p diff --git a/libs/wire-api-federation/src/Wire/API/Federation/HasNotificationEndpoint.hs b/libs/wire-api-federation/src/Wire/API/Federation/HasNotificationEndpoint.hs index c2f5772a255..7fba640ee90 100644 --- a/libs/wire-api-federation/src/Wire/API/Federation/HasNotificationEndpoint.hs +++ b/libs/wire-api-federation/src/Wire/API/Federation/HasNotificationEndpoint.hs @@ -15,53 +15,78 @@ -- You should have received a copy of the GNU Affero General Public License along -- with this program. If not, see . -module Wire.API.Federation.HasNotificationEndpoint where +module Wire.API.Federation.HasNotificationEndpoint + ( IsNotificationTag (..), + HasNotificationEndpoint (..), + HasFedPath, + HasVersionRange, + fedPath, + versionRange, + ) +where -import Data.Aeson -import Data.Domain -import Data.Id import Data.Kind import Data.Proxy -import Data.Text qualified as T +import Data.Singletons import GHC.TypeLits import Imports -import Wire.API.Federation.BackendNotifications import Wire.API.Federation.Component -import Wire.API.RawJson +import Wire.API.Federation.Version +import Wire.API.Routes.Version (From, Until) class IsNotificationTag k where type NotificationComponent k = (c :: Component) | c -> k class HasNotificationEndpoint t where -- | The type of the payload for this endpoint - type Payload t :: Type + type Payload t = (p :: Type) | p -> t -- | The central path component of a notification endpoint, e.g., -- "on-conversation-updated". type NotificationPath t :: Symbol --- | Convert a federation endpoint to a backend notification to be enqueued to a --- RabbitMQ queue. -fedNotifToBackendNotif :: - forall {k} (tag :: k). - KnownSymbol (NotificationPath tag) => - KnownComponent (NotificationComponent k) => - ToJSON (Payload tag) => - RequestId -> - Domain -> - Payload tag -> - BackendNotification -fedNotifToBackendNotif rid ownDomain payload = - let p = T.pack . symbolVal $ Proxy @(NotificationPath tag) - b = RawJson . encode $ payload - in toNotif p b + -- | An optional version tag to distinguish different versions of the same + -- endpoint. + type NotificationVersionTag t :: Maybe Version + + type NotificationVersionTag t = 'Nothing + + type NotificationMods t :: [Type] + + type NotificationMods t = '[] + +type HasFedPath t = KnownSymbol (NotificationPath t) + +type HasVersionRange t = MkVersionRange (NotificationMods t) + +fedPath :: forall t. HasFedPath t => String +fedPath = symbolVal (Proxy @(NotificationPath t)) + +-- | Build a version range using any 'Until' and 'From' combinators present in +-- the endpoint modifiers. +class MkVersionRange mods where + mkVersionRange :: VersionRange + +instance MkVersionRange '[] where + mkVersionRange = allVersions + +instance + {-# OVERLAPPING #-} + (MkVersionRange mods, SingI v) => + MkVersionRange (From (v :: Version) ': mods) + where + mkVersionRange = mkVersionRange @mods <> rangeFromVersion (demote @v) + +instance + {-# OVERLAPPING #-} + (MkVersionRange mods, SingI v) => + MkVersionRange (Until (v :: Version) ': mods) where - toNotif :: Text -> RawJson -> BackendNotification - toNotif path body = - BackendNotification - { ownDomain = ownDomain, - targetComponent = componentVal @(NotificationComponent k), - path = path, - body = body, - requestId = Just rid - } + mkVersionRange = mkVersionRange @mods <> rangeUntilVersion (demote @v) + +instance {-# OVERLAPPABLE #-} MkVersionRange mods => MkVersionRange (m ': mods) where + mkVersionRange = mkVersionRange @mods + +-- | The federation API version range this endpoint is supported in. +versionRange :: forall t. HasVersionRange t => VersionRange +versionRange = mkVersionRange @(NotificationMods t) diff --git a/libs/wire-api-federation/src/Wire/API/Federation/Version.hs b/libs/wire-api-federation/src/Wire/API/Federation/Version.hs index b1e29cf520e..a9055c7384b 100644 --- a/libs/wire-api-federation/src/Wire/API/Federation/Version.hs +++ b/libs/wire-api-federation/src/Wire/API/Federation/Version.hs @@ -17,16 +17,37 @@ -- -- You should have received a copy of the GNU Affero General Public License along -- with this program. If not, see . -module Wire.API.Federation.Version where +module Wire.API.Federation.Version + ( -- * Version, VersionInfo + Version (..), + V0Sym0, + V1Sym0, + intToVersion, + versionInt, + supportedVersions, + VersionInfo (..), + versionInfo, -import Control.Lens ((?~)) + -- * VersionRange + VersionUpperBound (..), + VersionRange (..), + fromVersion, + toVersionExcl, + allVersions, + latestCommonVersion, + rangeFromVersion, + rangeUntilVersion, + enumVersionRange, + ) +where + +import Control.Lens (makeLenses, (?~)) import Data.Aeson (FromJSON (..), ToJSON (..)) import Data.OpenApi qualified as S import Data.Schema import Data.Set qualified as Set import Data.Singletons.Base.TH import Imports -import Wire.API.VersionInfo data Version = V0 | V1 deriving stock (Eq, Ord, Bounded, Enum, Show, Generic) @@ -36,6 +57,9 @@ versionInt :: Version -> Int versionInt V0 = 0 versionInt V1 = 1 +intToVersion :: Int -> Maybe Version +intToVersion intV = find (\v -> versionInt v == intV) [minBound ..] + instance ToSchema Version where schema = enum @Integer "Version" . mconcat $ @@ -47,7 +71,7 @@ supportedVersions :: Set Version supportedVersions = Set.fromList [minBound .. maxBound] data VersionInfo = VersionInfo - { vinfoSupported :: [Version] + { vinfoSupported :: [Int] } deriving (FromJSON, ToJSON, S.ToSchema) via (Schema VersionInfo) @@ -55,16 +79,108 @@ instance ToSchema VersionInfo where schema = objectWithDocModifier "VersionInfo" (S.schema . S.example ?~ toJSON example) $ VersionInfo - <$> vinfoSupported .= vinfoObjectSchema schema + -- if the supported_versions field does not exist, assume an old backend + -- that only supports V0 + <$> vinfoSupported + .= fmap + (fromMaybe [0]) + (optField "supported_versions" (array schema)) + -- legacy field to support older versions of the backend with broken + -- version negotiation + <* const [0 :: Int, 1] .= field "supported" (array schema) where example :: VersionInfo example = VersionInfo - { vinfoSupported = toList supportedVersions + { vinfoSupported = map versionInt (toList supportedVersions) } versionInfo :: VersionInfo -versionInfo = VersionInfo (toList supportedVersions) +versionInfo = VersionInfo (map versionInt (toList supportedVersions)) + +---------------------------------------------------------------------- + +-- | The upper bound of a version range. +-- +-- The order of constructors here makes the 'Unbounded' value maximum in the +-- generated lexicographic ordering. +data VersionUpperBound = VersionUpperBound Version | Unbounded + deriving (Eq, Ord, Show) + +versionFromUpperBound :: VersionUpperBound -> Maybe Version +versionFromUpperBound (VersionUpperBound v) = Just v +versionFromUpperBound Unbounded = Nothing + +versionToUpperBound :: Maybe Version -> VersionUpperBound +versionToUpperBound (Just v) = VersionUpperBound v +versionToUpperBound Nothing = Unbounded + +data VersionRange = VersionRange + { _fromVersion :: Version, + _toVersionExcl :: VersionUpperBound + } + +deriving instance Eq VersionRange + +deriving instance Show VersionRange + +deriving instance Ord VersionRange + +makeLenses ''VersionRange + +instance ToSchema VersionRange where + schema = + object "VersionRange" $ + VersionRange + <$> _fromVersion .= field "from" schema + <*> (versionFromUpperBound . _toVersionExcl) + .= maybe_ (versionToUpperBound <$> optFieldWithDocModifier "until_excl" desc schema) + where + desc = description ?~ "exlusive upper version bound" + +deriving via Schema VersionRange instance ToJSON VersionRange + +deriving via Schema VersionRange instance FromJSON VersionRange + +allVersions :: VersionRange +allVersions = VersionRange minBound Unbounded + +-- | The semigroup instance of VersionRange is intersection. +instance Semigroup VersionRange where + VersionRange from1 to1 <> VersionRange from2 to2 = + VersionRange (max from1 from2) (min to1 to2) + +inVersionRange :: VersionRange -> Version -> Bool +inVersionRange (VersionRange a b) v = + v >= a && VersionUpperBound v < b + +rangeFromVersion :: Version -> VersionRange +rangeFromVersion v = VersionRange v Unbounded + +rangeUntilVersion :: Version -> VersionRange +rangeUntilVersion v = VersionRange minBound (VersionUpperBound v) + +enumVersionRange :: VersionRange -> Set Version +enumVersionRange = + Set.fromList . \case + VersionRange l Unbounded -> [l ..] + VersionRange l (VersionUpperBound u) -> init [l .. u] + +-- | For a version range of a local backend and for a set of versions that a +-- remote backend supports, compute the newest version supported by both. The +-- remote versions are given as integers as the range of versions supported by +-- the remote backend can include a version unknown to the local backend. If +-- there is no version in common, the return value is 'Nothing'. +latestCommonVersion :: Foldable f => VersionRange -> f Int -> Maybe Version +latestCommonVersion localVersions = + safeMaximum + . filter (inVersionRange localVersions) + . mapMaybe intToVersion + . toList + +safeMaximum :: Ord a => [a] -> Maybe a +safeMaximum [] = Nothing +safeMaximum as = Just (maximum as) $(genSingletons [''Version]) diff --git a/libs/wire-api-federation/test/Test/Wire/API/Federation/Golden/ConversationUpdate.hs b/libs/wire-api-federation/test/Test/Wire/API/Federation/Golden/ConversationUpdate.hs index 3e8635f9851..568e6533b67 100644 --- a/libs/wire-api-federation/test/Test/Wire/API/Federation/Golden/ConversationUpdate.hs +++ b/libs/wire-api-federation/test/Test/Wire/API/Federation/Golden/ConversationUpdate.hs @@ -16,7 +16,9 @@ -- with this program. If not, see . module Test.Wire.API.Federation.Golden.ConversationUpdate - ( testObject_ConversationUpdate1, + ( testObject_ConversationUpdate1V0, + testObject_ConversationUpdate2V0, + testObject_ConversationUpdate1, testObject_ConversationUpdate2, ) where @@ -31,7 +33,7 @@ import Imports import Wire.API.Conversation import Wire.API.Conversation.Action import Wire.API.Conversation.Role (roleNameWireAdmin) -import Wire.API.Federation.API.Galley (ConversationUpdate (..)) +import Wire.API.Federation.API.Galley qAlice, qBob :: Qualified UserId qAlice = @@ -47,9 +49,9 @@ chad, dee :: UserId chad = Id (fromJust (UUID.fromString "00000fff-0000-0000-0000-000100005007")) dee = Id (fromJust (UUID.fromString "00000fff-0000-aaaa-0000-000100005007")) -testObject_ConversationUpdate1 :: ConversationUpdate -testObject_ConversationUpdate1 = - ConversationUpdate +testObject_ConversationUpdate1V0 :: ConversationUpdateV0 +testObject_ConversationUpdate1V0 = + ConversationUpdateV0 { cuTime = read "1864-04-12 12:22:43.673 UTC", cuOrigUserId = Qualified @@ -61,9 +63,9 @@ testObject_ConversationUpdate1 = cuAction = SomeConversationAction (sing @'ConversationJoinTag) (ConversationJoin (qAlice :| [qBob]) roleNameWireAdmin) } -testObject_ConversationUpdate2 :: ConversationUpdate -testObject_ConversationUpdate2 = - ConversationUpdate +testObject_ConversationUpdate2V0 :: ConversationUpdateV0 +testObject_ConversationUpdate2V0 = + ConversationUpdateV0 { cuTime = read "1864-04-12 12:22:43.673 UTC", cuOrigUserId = Qualified @@ -74,3 +76,31 @@ testObject_ConversationUpdate2 = cuAlreadyPresentUsers = [chad, dee], cuAction = SomeConversationAction (sing @'ConversationLeaveTag) () } + +testObject_ConversationUpdate1 :: ConversationUpdate +testObject_ConversationUpdate1 = + ConversationUpdate + { time = read "1864-04-12 12:22:43.673 UTC", + origUserId = + Qualified + (Id (fromJust (UUID.fromString "00000000-0000-0000-0000-000100000007"))) + (Domain "golden.example.com"), + convId = + Id (fromJust (UUID.fromString "00000000-0000-0000-0000-000100000006")), + alreadyPresentUsers = [], + action = SomeConversationAction (sing @'ConversationJoinTag) (ConversationJoin (qAlice :| [qBob]) roleNameWireAdmin) + } + +testObject_ConversationUpdate2 :: ConversationUpdate +testObject_ConversationUpdate2 = + ConversationUpdate + { time = read "1864-04-12 12:22:43.673 UTC", + origUserId = + Qualified + (Id (fromJust (UUID.fromString "00000000-0000-0000-0000-000100000007"))) + (Domain "golden.example.com"), + convId = + Id (fromJust (UUID.fromString "00000000-0000-0000-0000-000100000006")), + alreadyPresentUsers = [chad, dee], + action = SomeConversationAction (sing @'ConversationLeaveTag) () + } diff --git a/libs/wire-api-federation/test/Test/Wire/API/Federation/Golden/GoldenSpec.hs b/libs/wire-api-federation/test/Test/Wire/API/Federation/Golden/GoldenSpec.hs index b436775494e..b691cd8e962 100644 --- a/libs/wire-api-federation/test/Test/Wire/API/Federation/Golden/GoldenSpec.hs +++ b/libs/wire-api-federation/test/Test/Wire/API/Federation/Golden/GoldenSpec.hs @@ -46,6 +46,10 @@ spec = (MLSMessageSendingStatus.testObject_MLSMessageSendingStatus3, "testObject_MLSMessageSendingStatus3.json") ] testObjects [(LeaveConversationRequest.testObject_LeaveConversationRequest1, "testObject_LeaveConversationRequest1.json")] + testObjects + [ (ConversationUpdate.testObject_ConversationUpdate1V0, "testObject_ConversationUpdate1V0.json"), + (ConversationUpdate.testObject_ConversationUpdate2V0, "testObject_ConversationUpdate2V0.json") + ] testObjects [ (ConversationUpdate.testObject_ConversationUpdate1, "testObject_ConversationUpdate1.json"), (ConversationUpdate.testObject_ConversationUpdate2, "testObject_ConversationUpdate2.json") diff --git a/libs/wire-api-federation/test/golden/testObject_ConversationUpdate1.json b/libs/wire-api-federation/test/golden/testObject_ConversationUpdate1.json index 0c5ff9a27f2..a559d4197e5 100644 --- a/libs/wire-api-federation/test/golden/testObject_ConversationUpdate1.json +++ b/libs/wire-api-federation/test/golden/testObject_ConversationUpdate1.json @@ -1,5 +1,5 @@ { - "cuAction": { + "action": { "action": { "role": "wire_admin", "users": [ @@ -15,11 +15,11 @@ }, "tag": "ConversationJoinTag" }, - "cuAlreadyPresentUsers": [], - "cuConvId": "00000000-0000-0000-0000-000100000006", - "cuOrigUserId": { + "alreadyPresentUsers": [], + "convId": "00000000-0000-0000-0000-000100000006", + "origUserId": { "domain": "golden.example.com", "id": "00000000-0000-0000-0000-000100000007" }, - "cuTime": "1864-04-12T12:22:43.673Z" -} \ No newline at end of file + "time": "1864-04-12T12:22:43.673Z" +} diff --git a/libs/wire-api-federation/test/golden/testObject_ConversationUpdate1V0.json b/libs/wire-api-federation/test/golden/testObject_ConversationUpdate1V0.json new file mode 100644 index 00000000000..89e99c41c09 --- /dev/null +++ b/libs/wire-api-federation/test/golden/testObject_ConversationUpdate1V0.json @@ -0,0 +1,26 @@ +{ + "cuAction": { + "action": { + "role": "wire_admin", + "users": [ + { + "domain": "golden.example.com", + "id": "00000000-0000-0000-0000-000100004007" + }, + { + "domain": "golden2.example.com", + "id": "00000000-0000-0000-0000-000100005007" + } + ] + }, + "tag": "ConversationJoinTag" + }, + "cuAlreadyPresentUsers": [], + "cuConvId": "00000000-0000-0000-0000-000100000006", + "cuOrigUserId": { + "domain": "golden.example.com", + "id": "00000000-0000-0000-0000-000100000007" + }, + "cuTime": "1864-04-12T12:22:43.673Z" +} + diff --git a/libs/wire-api-federation/test/golden/testObject_ConversationUpdate2.json b/libs/wire-api-federation/test/golden/testObject_ConversationUpdate2.json index 8b443934beb..fea5fc43ecb 100644 --- a/libs/wire-api-federation/test/golden/testObject_ConversationUpdate2.json +++ b/libs/wire-api-federation/test/golden/testObject_ConversationUpdate2.json @@ -1,16 +1,16 @@ { - "cuAction": { + "action": { "action": {}, "tag": "ConversationLeaveTag" }, - "cuAlreadyPresentUsers": [ + "alreadyPresentUsers": [ "00000fff-0000-0000-0000-000100005007", "00000fff-0000-aaaa-0000-000100005007" ], - "cuConvId": "00000000-0000-0000-0000-000100000006", - "cuOrigUserId": { + "convId": "00000000-0000-0000-0000-000100000006", + "origUserId": { "domain": "golden.example.com", "id": "00000000-0000-0000-0000-000100000007" }, - "cuTime": "1864-04-12T12:22:43.673Z" -} \ No newline at end of file + "time": "1864-04-12T12:22:43.673Z" +} diff --git a/libs/wire-api-federation/test/golden/testObject_ConversationUpdate2V0.json b/libs/wire-api-federation/test/golden/testObject_ConversationUpdate2V0.json new file mode 100644 index 00000000000..df533d7bad9 --- /dev/null +++ b/libs/wire-api-federation/test/golden/testObject_ConversationUpdate2V0.json @@ -0,0 +1,17 @@ +{ + "cuAction": { + "action": {}, + "tag": "ConversationLeaveTag" + }, + "cuAlreadyPresentUsers": [ + "00000fff-0000-0000-0000-000100005007", + "00000fff-0000-aaaa-0000-000100005007" + ], + "cuConvId": "00000000-0000-0000-0000-000100000006", + "cuOrigUserId": { + "domain": "golden.example.com", + "id": "00000000-0000-0000-0000-000100000007" + }, + "cuTime": "1864-04-12T12:22:43.673Z" +} + diff --git a/libs/wire-api-federation/wire-api-federation.cabal b/libs/wire-api-federation/wire-api-federation.cabal index d92f39c0792..f27c2fb751b 100644 --- a/libs/wire-api-federation/wire-api-federation.cabal +++ b/libs/wire-api-federation/wire-api-federation.cabal @@ -23,6 +23,7 @@ library Wire.API.Federation.API.Common Wire.API.Federation.API.Galley Wire.API.Federation.API.Galley.Notifications + Wire.API.Federation.API.Util Wire.API.Federation.BackendNotifications Wire.API.Federation.Client Wire.API.Federation.Component diff --git a/services/background-worker/background-worker.cabal b/services/background-worker/background-worker.cabal index 1eb4df1229d..36e566299da 100644 --- a/services/background-worker/background-worker.cabal +++ b/services/background-worker/background-worker.cabal @@ -29,6 +29,7 @@ library build-depends: aeson , amqp + , base , containers , exceptions , extended @@ -49,6 +50,7 @@ library , types-common , unliftio , wai-utilities + , wire-api , wire-api-federation default-extensions: @@ -176,6 +178,7 @@ test-suite background-worker-test , base , bytestring , containers + , data-default , extended , federator , hspec diff --git a/services/background-worker/default.nix b/services/background-worker/default.nix index 910b9a396dd..31ce1fae0eb 100644 --- a/services/background-worker/default.nix +++ b/services/background-worker/default.nix @@ -8,6 +8,7 @@ , base , bytestring , containers +, data-default , exceptions , extended , federator @@ -50,6 +51,7 @@ mkDerivation { libraryHaskellDepends = [ aeson amqp + base containers exceptions extended @@ -70,6 +72,7 @@ mkDerivation { types-common unliftio wai-utilities + wire-api wire-api-federation ]; executableHaskellDepends = [ HsOpenSSL imports types-common ]; @@ -79,6 +82,7 @@ mkDerivation { base bytestring containers + data-default extended federator hspec diff --git a/services/background-worker/src/Wire/BackendNotificationPusher.hs b/services/background-worker/src/Wire/BackendNotificationPusher.hs index 1fb6721eb58..7dfad1390f1 100644 --- a/services/background-worker/src/Wire/BackendNotificationPusher.hs +++ b/services/background-worker/src/Wire/BackendNotificationPusher.hs @@ -3,11 +3,13 @@ module Wire.BackendNotificationPusher where +import Control.Arrow import Control.Monad.Catch import Control.Retry import Data.Aeson qualified as A import Data.Domain import Data.Id +import Data.List.NonEmpty qualified as NE import Data.Map.Strict qualified as Map import Data.Set qualified as Set import Data.Text qualified as Text @@ -19,8 +21,12 @@ import Network.RabbitMqAdmin import Prometheus import System.Logger.Class qualified as Log import UnliftIO +import Wire.API.Federation.API import Wire.API.Federation.BackendNotifications import Wire.API.Federation.Client +import Wire.API.Federation.Error +import Wire.API.Federation.Version +import Wire.API.RawJson import Wire.BackgroundWorker.Env import Wire.BackgroundWorker.Options import Wire.BackgroundWorker.Util @@ -78,32 +84,115 @@ pushNotification runningFlag targetDomain (msg, envelope) = do UnliftIO.bracket_ (takeMVar runningFlag) (putMVar runningFlag ()) go where go :: AppT IO () - go = case A.eitherDecode @BackendNotification (Q.msgBody msg) of + go = case A.eitherDecode @(PayloadBundle _) (Q.msgBody msg) of Left e -> do - Log.err $ - Log.msg (Log.val "Failed to parse notification, the notification will be ignored") - . Log.field "domain" (domainText targetDomain) - . Log.field "error" e + case A.eitherDecode @BackendNotification (Q.msgBody msg) of + Left eBN -> do + Log.err $ + Log.msg + ( Log.val "Cannot parse a queued message as s notification " + <> "nor as a bundle; the message will be ignored" + ) + . Log.field "domain" (domainText targetDomain) + . Log.field "error-notification" eBN + . Log.field + "error-bundle" + e + -- FUTUREWORK: This rejects the message without any requeueing. This is + -- dangerous as it could happen that a new type of notification is + -- introduced and an old instance of this worker is running, in which case + -- the notification will just get dropped. On the other hand not dropping + -- this message blocks the whole queue. Perhaps there is a better way to + -- deal with this. + lift $ reject envelope False + Right notif -> do + -- FUTUREWORK: Drop support for parsing it as a + -- single notification as soon as we can guarantee + -- that the message queue does not contain any + -- 'BackendNotification's anymore. + ceFederator <- asks (.federatorInternal) + ceHttp2Manager <- asks http2Manager + let ceOriginDomain = notif.ownDomain + ceTargetDomain = targetDomain + ceOriginRequestId = fromMaybe (RequestId "N/A") notif.requestId + cveEnv = FederatorClientEnv {..} + cveVersion = Just V0 -- V0 is assumed for non-versioned queue messages + fcEnv = FederatorClientVersionedEnv {..} + sendNotificationIgnoringVersionMismatch fcEnv notif.targetComponent notif.path notif.body + lift $ ack envelope + metrics <- asks backendNotificationMetrics + withLabel metrics.pushedCounter (domainText targetDomain) incCounter + withLabel metrics.stuckQueuesGauge (domainText targetDomain) (flip setGauge 0) + Right bundle -> do + federator <- asks (.federatorInternal) + manager <- asks http2Manager + let env = + FederatorClientEnv + { ceOriginDomain = ownDomain . NE.head $ bundle.notifications, + ceTargetDomain = targetDomain, + ceFederator = federator, + ceHttp2Manager = manager, + ceOriginRequestId = + fromMaybe (RequestId "N/A") . (.requestId) . NE.head $ bundle.notifications + } + remoteVersions :: Set Int <- + liftIO + -- use versioned client with no version set: since we are manually + -- performing version negotiation, we don't want the client to + -- negotiate a version for us + ( runVersionedFederatorClient @'Brig (unversionedEnv env) $ + fedClientIn @'Brig @"api-version" () + ) + >>= \case + Left e -> do + Log.err $ + Log.msg (Log.val "Failed to get supported API versions") + . Log.field "domain" (domainText targetDomain) + . Log.field "error" (displayException e) + throwM e + Right vi -> pure . Set.fromList . vinfoSupported $ vi - -- FUTUREWORK: This rejects the message without any requeueing. This is - -- dangerous as it could happen that a new type of notification is - -- introduced and an old instance of this worker is running, in which case - -- the notification will just get dropped. On the other hand not dropping - -- this message blocks the whole queue. Perhaps there is a better way to - -- deal with this. - lift $ reject envelope False - Right notif -> do - ceFederator <- asks (.federatorInternal) - ceHttp2Manager <- asks http2Manager - let ceOriginDomain = notif.ownDomain - ceTargetDomain = targetDomain - ceOriginRequestId = fromMaybe (RequestId "N/A") notif.requestId - fcEnv = FederatorClientEnv {..} - liftIO $ either throwM pure =<< sendNotification fcEnv notif.targetComponent notif.path notif.body - lift $ ack envelope - metrics <- asks backendNotificationMetrics - withLabel metrics.pushedCounter (domainText targetDomain) incCounter - withLabel metrics.stuckQueuesGauge (domainText targetDomain) (flip setGauge 0) + -- compute the best usable version in a notification + let bestVersion = bodyVersions >=> flip latestCommonVersion remoteVersions + case pairedMaximumOn bestVersion (toList (notifications bundle)) of + (_, Nothing) -> + Log.fatal $ + Log.msg (Log.val "No federation API version in common, the notification will be ignored") + . Log.field "domain" (domainText targetDomain) + (notif, cveVersion) -> do + ceFederator <- asks (.federatorInternal) + ceHttp2Manager <- asks http2Manager + let ceOriginDomain = notif.ownDomain + ceTargetDomain = targetDomain + ceOriginRequestId = fromMaybe (RequestId "N/A") notif.requestId + cveEnv = FederatorClientEnv {..} + fcEnv = FederatorClientVersionedEnv {..} + sendNotificationIgnoringVersionMismatch fcEnv notif.targetComponent notif.path notif.body + lift $ ack envelope + metrics <- asks backendNotificationMetrics + withLabel metrics.pushedCounter (domainText targetDomain) incCounter + withLabel metrics.stuckQueuesGauge (domainText targetDomain) (flip setGauge 0) + +sendNotificationIgnoringVersionMismatch :: + FederatorClientVersionedEnv -> + Component -> + Text -> + RawJson -> + AppT IO () +sendNotificationIgnoringVersionMismatch env comp path body = + liftIO (sendNotification env comp path body) >>= \case + Left (FederatorClientVersionNegotiationError v) -> do + Log.fatal $ + Log.msg (Log.val "Federator version negotiation error") + . Log.field "domain" (domainText env.cveEnv.ceTargetDomain) + . Log.field "error" (show v) + pure () + Left e -> throwM e + Right () -> pure () + +-- | Find the pair that maximises b. +pairedMaximumOn :: Ord b => (a -> b) -> [a] -> (a, b) +pairedMaximumOn f = maximumBy (compare `on` snd) . map (id &&& f) -- FUTUREWORK: Recosider using 1 channel for many consumers. It shouldn't matter -- for a handful of remote domains. diff --git a/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs b/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs index 2a458b6990e..472f02d1f2e 100644 --- a/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs +++ b/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs @@ -9,6 +9,7 @@ import Control.Monad.Trans.Except import Data.Aeson qualified as Aeson import Data.ByteString.Builder qualified as Builder import Data.ByteString.Lazy qualified as LBS +import Data.Default import Data.Domain import Data.Id import Data.Range @@ -37,9 +38,11 @@ import Test.QuickCheck import Test.Wire.Util import UnliftIO.Async import Util.Options +import Wire.API.Conversation.Action import Wire.API.Federation.API import Wire.API.Federation.API.Brig import Wire.API.Federation.API.Common +import Wire.API.Federation.API.Galley import Wire.API.Federation.BackendNotifications import Wire.API.RawJson import Wire.BackendNotificationPusher @@ -51,7 +54,6 @@ spec :: Spec spec = do describe "pushNotification" $ do it "should push notifications" $ do - let returnSuccess _ = pure ("application/json", Aeson.encode EmptyResponse) let origDomain = Domain "origin.example.com" targetDomain = Domain "target.example.com" -- Just using 'arbitrary' could generate a very big list, making tests very @@ -64,6 +66,7 @@ spec = do ownDomain = origDomain, path = "/on-user-deleted-connections", body = RawJson $ Aeson.encode notifContent, + bodyVersions = Nothing, requestId = Just $ RequestId "N/A" } envelope <- newMockEnvelope @@ -74,7 +77,7 @@ spec = do } runningFlag <- newMVar () (env, fedReqs) <- - withTempMockFederator [] returnSuccess . runTestAppT $ do + withTempMockFederator def . runTestAppT $ do wait =<< pushNotification runningFlag targetDomain (msg, envelope) ask @@ -92,8 +95,88 @@ spec = do getVectorWith env.backendNotificationMetrics.pushedCounter getCounter `shouldReturn` [(domainText targetDomain, 1)] + it "should push notification bundles" $ do + let origDomain = Domain "origin.example.com" + targetDomain = Domain "target.example.com" + -- Just using 'arbitrary' could generate a very big list, making tests very + -- slow. Make me wonder if notification pusher should even try to parse the + -- actual content, seems like wasted compute power. + notifContent <- + generate $ + ClientRemovedRequest <$> arbitrary <*> arbitrary <*> arbitrary + let bundle = toBundle @'OnClientRemovedTag (RequestId "N/A") origDomain notifContent + envelope <- newMockEnvelope + let msg = + Q.newMsg + { Q.msgBody = Aeson.encode bundle, + Q.msgContentType = Just "application/json" + } + runningFlag <- newMVar () + (env, fedReqs) <- + withTempMockFederator def . runTestAppT $ do + wait =<< pushNotification runningFlag targetDomain (msg, envelope) + ask + + readIORef envelope.acks `shouldReturn` 1 + readIORef envelope.rejections `shouldReturn` [] + fedReqs + `shouldBe` [ FederatedRequest + { frTargetDomain = targetDomain, + frOriginDomain = origDomain, + frComponent = Galley, + frRPC = "on-client-removed", + frBody = Aeson.encode notifContent + } + ] + getVectorWith env.backendNotificationMetrics.pushedCounter getCounter + `shouldReturn` [(domainText targetDomain, 1)] + + it "should negotiate the best version" $ do + let origDomain = Domain "origin.example.com" + targetDomain = Domain "target.example.com" + update <- generate $ do + now <- arbitrary + user <- arbitrary + convId <- arbitrary + pure + ConversationUpdate + { time = now, + origUserId = user, + convId = convId, + alreadyPresentUsers = [], + action = SomeConversationAction SConversationLeaveTag () + } + let update0 = conversationUpdateToV0 update + let bundle = + toBundle (RequestId "N/A") origDomain update + <> toBundle (RequestId "N/A") origDomain update0 + envelope <- newMockEnvelope + let msg = + Q.newMsg + { Q.msgBody = Aeson.encode bundle, + Q.msgContentType = Just "application/json" + } + runningFlag <- newMVar () + (env, fedReqs) <- + withTempMockFederator def {versions = [0, 2]} . runTestAppT $ do + wait =<< pushNotification runningFlag targetDomain (msg, envelope) + ask + + readIORef envelope.acks `shouldReturn` 1 + readIORef envelope.rejections `shouldReturn` [] + fedReqs + `shouldBe` [ FederatedRequest + { frTargetDomain = targetDomain, + frOriginDomain = origDomain, + frComponent = Galley, + frRPC = "on-conversation-updated", + frBody = Aeson.encode update0 + } + ] + getVectorWith env.backendNotificationMetrics.pushedCounter getCounter + `shouldReturn` [(domainText targetDomain, 1)] + it "should reject invalid notifications" $ do - let returnSuccess _ = pure ("application/json", Aeson.encode EmptyResponse) envelope <- newMockEnvelope let msg = Q.newMsg @@ -102,7 +185,7 @@ spec = do } runningFlag <- newMVar () (env, fedReqs) <- - withTempMockFederator [] returnSuccess . runTestAppT $ do + withTempMockFederator def . runTestAppT $ do wait =<< pushNotification runningFlag (Domain "target.example.com") (msg, envelope) ask @@ -131,6 +214,7 @@ spec = do ownDomain = origDomain, path = "/on-user-deleted-connections", body = RawJson $ Aeson.encode notifContent, + bodyVersions = Nothing, requestId = Just $ RequestId "N/A" } envelope <- newMockEnvelope @@ -142,7 +226,7 @@ spec = do runningFlag <- newMVar () env <- testEnv pushThread <- - async $ withTempMockFederator [] mockRemote . runTestAppTWithEnv env $ do + async $ withTempMockFederator def {handler = mockRemote} . runTestAppTWithEnv env $ do wait =<< pushNotification runningFlag targetDomain (msg, envelope) -- Wait for two calls, so we can be sure that the metric about stuck diff --git a/services/brig/src/Brig/Federation/Client.hs b/services/brig/src/Brig/Federation/Client.hs index ad697843719..d86f706169f 100644 --- a/services/brig/src/Brig/Federation/Client.hs +++ b/services/brig/src/Brig/Federation/Client.hs @@ -157,8 +157,7 @@ notifyUserDeleted self remotes = do view rabbitmqChannel >>= \case Just chanVar -> do enqueueNotification (tDomain self) remoteDomain Q.Persistent chanVar $ - void $ - fedQueueClient @'OnUserDeletedConnectionsTag notif + fedQueueClient @'OnUserDeletedConnectionsTag notif Nothing -> Log.err $ Log.msg ("Federation error while notifying remote backends of a user deletion." :: ByteString) diff --git a/services/brig/test/integration/API/Federation.hs b/services/brig/test/integration/API/Federation.hs index 2f0def2baef..3b5829f1c5a 100644 --- a/services/brig/test/integration/API/Federation.hs +++ b/services/brig/test/integration/API/Federation.hs @@ -373,4 +373,4 @@ testGetUserClientsNotFound fedBrigClient = do testAPIVersion :: Brig -> FedClient 'Brig -> Http () testAPIVersion _brig fedBrigClient = do vinfo <- runFedClient @"api-version" fedBrigClient (Domain "far-away.example.com") () - liftIO $ vinfoSupported vinfo @?= toList supportedVersions + liftIO $ vinfoSupported vinfo @?= map versionInt (toList supportedVersions) diff --git a/services/brig/test/integration/Federation/Util.hs b/services/brig/test/integration/Federation/Util.hs index cb0eb3e35bb..4a1376e8686 100644 --- a/services/brig/test/integration/Federation/Util.hs +++ b/services/brig/test/integration/Federation/Util.hs @@ -35,6 +35,7 @@ import Data.Aeson (FromJSON, Value, decode, (.=)) import Data.Aeson qualified as Aeson import Data.ByteString qualified as BS import Data.ByteString.Conversion (toByteString') +import Data.Default import Data.Domain (Domain (Domain)) import Data.Handle (fromHandle) import Data.Id @@ -79,8 +80,7 @@ import Wire.API.User.Client.Prekey withTempMockFederator :: Opt.Opts -> LByteString -> Session a -> IO (a, [Mock.FederatedRequest]) withTempMockFederator opts resp action = Mock.withTempMockFederator - [("Content-Type", "application/json")] - (const (pure ("application" // "json", resp))) + def {Mock.handler = const (pure ("application" // "json", resp))} $ \mockPort -> do let opts' = opts diff --git a/services/brig/test/integration/Util.hs b/services/brig/test/integration/Util.hs index 91cd6b674b6..46b8aa917c3 100644 --- a/services/brig/test/integration/Util.hs +++ b/services/brig/test/integration/Util.hs @@ -50,6 +50,7 @@ import Data.ByteString.Builder (toLazyByteString) import Data.ByteString.Char8 (pack) import Data.ByteString.Char8 qualified as B8 import Data.ByteString.Conversion +import Data.Default import Data.Domain (Domain (..), domainText, mkDomain) import Data.Handle (Handle (..)) import Data.Id @@ -1231,8 +1232,7 @@ withMockedFederatorAndGalley opts _domain fedResp galleyHandler action = do result <- assertRight <=< runExceptT $ withTempMockedService initState galleyHandler $ \galleyMockState -> Mock.withTempMockFederator - [("Content-Type", "application/json")] - ((\r -> pure ("application" // "json", r)) <=< fedResp) + def {Mock.handler = (\r -> pure ("application" // "json", r)) <=< fedResp} $ \fedMockPort -> do let opts' = opts diff --git a/services/cargohold/cargohold.cabal b/services/cargohold/cargohold.cabal index 7723b5814f9..f3c5ad95c44 100644 --- a/services/cargohold/cargohold.cabal +++ b/services/cargohold/cargohold.cabal @@ -269,6 +269,7 @@ executable cargohold-integration , cargohold , cargohold-types , containers + , data-default , federator , http-api-data , http-client >=0.7 diff --git a/services/cargohold/default.nix b/services/cargohold/default.nix index 9c9c13d493b..58b2e770a30 100644 --- a/services/cargohold/default.nix +++ b/services/cargohold/default.nix @@ -19,6 +19,7 @@ , conduit-extra , containers , crypton +, data-default , errors , exceptions , extended @@ -138,6 +139,7 @@ mkDerivation { bytestring-conversion cargohold-types containers + data-default federator HsOpenSSL http-api-data diff --git a/services/cargohold/test/integration/API/Util.hs b/services/cargohold/test/integration/API/Util.hs index 1c9e1057248..2c51dc9b29f 100644 --- a/services/cargohold/test/integration/API/Util.hs +++ b/services/cargohold/test/integration/API/Util.hs @@ -42,6 +42,7 @@ import Data.ByteString.Builder import qualified Data.ByteString.Char8 as C import Data.ByteString.Conversion import qualified Data.ByteString.Lazy as Lazy +import Data.Default import Data.Id import Data.Qualified import Data.Text.Encoding (decodeLatin1, encodeUtf8) @@ -223,7 +224,7 @@ withMockFederator :: TestM a -> TestM (a, [FederatedRequest]) withMockFederator respond action = do - withTempMockFederator [] respond $ \p -> + withTempMockFederator def {handler = respond} $ \p -> withSettingsOverrides (federator . _Just %~ setLocalEndpoint (fromIntegral p)) action diff --git a/services/federator/default.nix b/services/federator/default.nix index 0871b678a85..af4aa3d502b 100644 --- a/services/federator/default.nix +++ b/services/federator/default.nix @@ -88,6 +88,7 @@ mkDerivation { containers crypton-x509 crypton-x509-validation + data-default dns dns-util exceptions diff --git a/services/federator/federator.cabal b/services/federator/federator.cabal index fe6fa823004..dec1a6d01e1 100644 --- a/services/federator/federator.cabal +++ b/services/federator/federator.cabal @@ -115,6 +115,7 @@ library , containers , crypton-x509 , crypton-x509-validation + , data-default , dns , dns-util , exceptions diff --git a/services/federator/src/Federator/MockServer.hs b/services/federator/src/Federator/MockServer.hs index 81b657d9760..463967531ba 100644 --- a/services/federator/src/Federator/MockServer.hs +++ b/services/federator/src/Federator/MockServer.hs @@ -19,6 +19,7 @@ module Federator.MockServer ( -- * Federator mock server + MockFederator (..), MockException (..), withTempMockFederator, FederatedRequest (..), @@ -44,6 +45,7 @@ import Control.Monad.Catch hiding (fromException) import Control.Monad.Trans.Except import Control.Monad.Trans.Maybe import Data.Aeson qualified as Aeson +import Data.Default import Data.Domain import Data.Text qualified as Text import Data.Text.Lazy qualified as LText @@ -66,6 +68,7 @@ import Servant.API import Servant.Server (Tagged (..)) import Servant.Server.Generic import Wire.API.Federation.API (Component) +import Wire.API.Federation.API.Common import Wire.API.Federation.Domain import Wire.API.Federation.Version import Wire.Sem.Logger.TinyLog @@ -104,16 +107,15 @@ mockServer :: Member (Error ValidationError) r ) => IORef [FederatedRequest] -> - [HTTP.Header] -> - (FederatedRequest -> IO (HTTP.MediaType, LByteString)) -> + MockFederator -> (Sem r Wai.Response -> IO Wai.Response) -> API AsServer -mockServer remoteCalls headers resp interpreter = +mockServer remoteCalls mock interpreter = Federator.InternalServer.API { status = const $ pure NoContent, internalRequest = \_mReqId targetDomain component rpc -> Tagged $ \req respond -> - respond =<< interpreter (mockInternalRequest remoteCalls headers resp targetDomain component rpc req) + respond =<< interpreter (mockInternalRequest remoteCalls mock targetDomain component rpc req) } mockInternalRequest :: @@ -123,14 +125,13 @@ mockInternalRequest :: Member (Error ValidationError) r ) => IORef [FederatedRequest] -> - [HTTP.Header] -> - (FederatedRequest -> IO (HTTP.MediaType, LByteString)) -> + MockFederator -> Domain -> Component -> RPC -> Wai.Request -> Sem r Wai.Response -mockInternalRequest remoteCalls headers resp targetDomain component (RPC path) req = do +mockInternalRequest remoteCalls mock targetDomain component (RPC path) req = do domainTxt <- note NoOriginDomain $ lookup originDomainHeaderName (Wai.requestHeaders req) originDomain <- parseDomain domainTxt reqBody <- embed $ Wai.lazyRequestBody req @@ -145,20 +146,34 @@ mockInternalRequest remoteCalls headers resp targetDomain component (RPC path) r ) (ct, resBody) <- if path == "api-version" - then pure ("application/json", Aeson.encode versionInfo) + then pure ("application/json", Aeson.encode (VersionInfo mock.versions)) else do modifyIORef remoteCalls (<> [fedRequest]) fromException @MockException . handle (throw . handleException) - $ resp fedRequest - let headers' = ("Content-Type", HTTP.renderHeader ct) : headers - pure $ Wai.responseLBS HTTP.status200 headers' resBody + $ mock.handler fedRequest + let headers = ("Content-Type", HTTP.renderHeader ct) : mock.headers + pure $ Wai.responseLBS HTTP.status200 headers resBody where handleException :: SomeException -> MockException handleException e = case Exception.fromException e of Just mockE -> mockE Nothing -> MockErrorResponse HTTP.status500 (LText.pack (displayException e)) +data MockFederator = MockFederator + { headers :: [HTTP.Header], + handler :: FederatedRequest -> IO (HTTP.MediaType, LByteString), + versions :: [Int] + } + +instance Default MockFederator where + def = + MockFederator + { headers = [], + handler = \_ -> pure ("application/json", Aeson.encode EmptyResponse), + versions = map versionInt (toList supportedVersions) + } + -- | Spawn a mock federator on a random port and run an action while it is running. -- -- A mock federator is a web application that parses requests of the same form @@ -166,11 +181,10 @@ mockInternalRequest remoteCalls headers resp targetDomain component (RPC path) r -- forwarding them to a remote federator. withTempMockFederator :: (MonadIO m, MonadMask m) => - [HTTP.Header] -> - (FederatedRequest -> IO (HTTP.MediaType, LByteString)) -> + MockFederator -> (Warp.Port -> m a) -> m (a, [FederatedRequest]) -withTempMockFederator headers resp action = do +withTempMockFederator mock action = do remoteCalls <- newIORef [] let interpreter = runM @@ -180,7 +194,7 @@ withTempMockFederator headers resp action = do ServerError, MockException ] - app = genericServe (mockServer remoteCalls headers resp interpreter) + app = genericServe (mockServer remoteCalls mock interpreter) result <- bracket (liftIO (startMockServer Nothing app)) diff --git a/services/federator/test/unit/Test/Federator/Client.hs b/services/federator/test/unit/Test/Federator/Client.hs index 2b79f8ab2a1..6100252dbbe 100644 --- a/services/federator/test/unit/Test/Federator/Client.hs +++ b/services/federator/test/unit/Test/Federator/Client.hs @@ -26,6 +26,7 @@ import Data.Bifunctor (first) import Data.ByteString qualified as BS import Data.ByteString.Builder (Builder, byteString, toLazyByteString) import Data.ByteString.Lazy qualified as LBS +import Data.Default import Data.Domain import Data.Id import Data.Proxy @@ -60,9 +61,6 @@ targetDomain = Domain "target.example.com" originDomain :: Domain originDomain = Domain "origin.example.com" -defaultHeaders :: [HTTP.Header] -defaultHeaders = [("Content-Type", "application/json")] - tests :: TestTree tests = testGroup @@ -87,11 +85,10 @@ newtype ResponseFailure = ResponseFailure Wai.Error deriving (Show) withMockFederatorClient :: - [HTTP.Header] -> - (FederatedRequest -> IO (MediaType, LByteString)) -> + MockFederator -> FederatorClient c a -> IO (Either ResponseFailure a, [FederatedRequest]) -withMockFederatorClient headers resp action = withTempMockFederator headers resp $ \port -> do +withMockFederatorClient mock action = withTempMockFederator mock $ \port -> do mgr <- defaultHttp2Manager let env = FederatorClientEnv @@ -114,8 +111,7 @@ testClientSuccess = do (actualResponse, sentRequests) <- withMockFederatorClient - defaultHeaders - (const (pure ("application/json", Aeson.encode (Just expectedResponse)))) + def {handler = const (pure ("application/json", Aeson.encode (Just expectedResponse)))} $ fedClient @'Brig @"get-user-by-handle" handle sentRequests @@ -157,8 +153,7 @@ testClientFailure = do (actualResponse, _) <- withMockFederatorClient - defaultHeaders - (const (throw (MockErrorResponse HTTP.status422 "wrong domain"))) + def {handler = const (throw (MockErrorResponse HTTP.status422 "wrong domain"))} $ do fedClient @'Brig @"get-user-by-handle" handle @@ -174,8 +169,7 @@ testFederatorFailure = do (actualResponse, _) <- withMockFederatorClient - defaultHeaders - (const (throw (MockErrorResponse HTTP.status403 "invalid path"))) + def {handler = const (throw (MockErrorResponse HTTP.status403 "invalid path"))} $ do fedClient @'Brig @"get-user-by-handle" handle @@ -190,7 +184,7 @@ testClientExceptions = do handle <- generate arbitrary (response, _) <- - withMockFederatorClient defaultHeaders (const (evaluate (error "unhandled exception"))) $ + withMockFederatorClient def {handler = const (evaluate (error "unhandled exception"))} $ fedClient @'Brig @"get-user-by-handle" handle case response of @@ -218,8 +212,10 @@ testClientConnectionError = do testResponseHeaders :: IO () testResponseHeaders = do (r, _) <- withTempMockFederator - [("X-Foo", "bar")] - (const $ pure ("application" // "json", mempty)) + def + { headers = [("X-Foo", "bar")], + handler = const $ pure ("application" // "json", mempty) + } $ \port -> do let req = HTTP2.requestBuilder diff --git a/services/galley/src/Galley/API/Action.hs b/services/galley/src/Galley/API/Action.hs index 0b373dc9574..063b985fd52 100644 --- a/services/galley/src/Galley/API/Action.hs +++ b/services/galley/src/Galley/API/Action.hs @@ -720,7 +720,6 @@ updateLocalConversation :: Member ExternalAccess r, Member NotificationSubsystem r, Member (Input UTCTime) r, - Member (Logger (Log.Msg -> Log.Msg)) r, HasConversationActionEffects tag r, SingI tag ) => @@ -760,7 +759,6 @@ updateLocalConversationUnchecked :: Member ExternalAccess r, Member NotificationSubsystem r, Member (Input UTCTime) r, - Member (Logger (Log.Msg -> Log.Msg)) r, HasConversationActionEffects tag r ) => Local Conversation -> @@ -861,9 +859,9 @@ notifyConversationAction :: forall tag r. ( Member BackendNotificationQueueAccess r, Member ExternalAccess r, + Member (Error FederationError) r, Member NotificationSubsystem r, - Member (Input UTCTime) r, - Member (Logger (Log.Msg -> Log.Msg)) r + Member (Input UTCTime) r ) => Sing tag -> Qualified UserId -> @@ -884,22 +882,19 @@ notifyConversationAction tag quid notifyOrigDomain con lconv targets action = do (tUnqualified lcnv) uids (SomeConversationAction tag action) - handleError :: FederationError -> Sem r (Maybe ConversationUpdate) - handleError fedErr = - logRemoteNotificationError @"on-conversation-updated" fedErr $> Nothing - update <- - fmap (fromMaybe (mkUpdate [])) - . (either handleError (pure . asum . map tUnqualified)) - <=< enqueueNotificationsConcurrently Q.Persistent (toList (bmRemotes targets)) - $ \ruids -> do - let update = mkUpdate (tUnqualified ruids) - -- if notifyOrigDomain is false, filter out user from quid's domain, - -- because quid's backend will update local state and notify its users - -- itself using the ConversationUpdate returned by this function - if notifyOrigDomain || tDomain ruids /= qDomain quid - then fedQueueClient @'OnConversationUpdatedTag update $> Nothing - else pure (Just update) + fmap (fromMaybe (mkUpdate []) . asum . map tUnqualified) $ + enqueueNotificationsConcurrently Q.Persistent (toList (bmRemotes targets)) $ + \ruids -> do + let update = mkUpdate (tUnqualified ruids) + -- if notifyOrigDomain is false, filter out user from quid's domain, + -- because quid's backend will update local state and notify its users + -- itself using the ConversationUpdate returned by this function + if notifyOrigDomain || tDomain ruids /= qDomain quid + then do + makeConversationUpdateBundle update >>= sendBundle + pure Nothing + else pure (Just update) -- notify local participants and bots pushConversationEvent con e (qualifyAs lcnv (bmLocals targets)) (bmBots targets) @@ -924,14 +919,14 @@ updateLocalStateOfRemoteConv :: updateLocalStateOfRemoteConv rcu con = do loc <- qualifyLocal () let cu = tUnqualified rcu - rconvId = fmap F.cuConvId rcu + rconvId = fmap (.convId) rcu qconvId = tUntagged rconvId -- Note: we generally do not send notifications to users that are not part of -- the conversation (from our point of view), to prevent spam from the remote -- backend. See also the comment below. (presentUsers, allUsersArePresent) <- - E.selectRemoteMembers (F.cuAlreadyPresentUsers cu) rconvId + E.selectRemoteMembers cu.alreadyPresentUsers rconvId -- Perform action, and determine extra notification targets. -- @@ -942,12 +937,12 @@ updateLocalStateOfRemoteConv rcu con = do -- updated, we do **not** add them to the list of targets, because we have no -- way to make sure that they are actually supposed to receive that notification. - (mActualAction, extraTargets) <- case F.cuAction cu of + (mActualAction, extraTargets) <- case cu.action of sca@(SomeConversationAction singTag action) -> case singTag of SConversationJoinTag -> do let ConversationJoin toAdd role = action let (localUsers, remoteUsers) = partitionQualified loc toAdd - addedLocalUsers <- Set.toList <$> addLocalUsersToRemoteConv rconvId (F.cuOrigUserId cu) localUsers + addedLocalUsers <- Set.toList <$> addLocalUsersToRemoteConv rconvId cu.origUserId localUsers let allAddedUsers = map (tUntagged . qualifyAs loc) addedLocalUsers <> map tUntagged remoteUsers pure $ ( fmap @@ -956,7 +951,7 @@ updateLocalStateOfRemoteConv rcu con = do addedLocalUsers ) SConversationLeaveTag -> do - let users = foldQualified loc (pure . tUnqualified) (const []) (F.cuOrigUserId cu) + let users = foldQualified loc (pure . tUnqualified) (const []) cu.origUserId E.deleteMembersInRemoteConversation rconvId users pure (Just sca, []) SConversationRemoveMembersTag -> do @@ -976,7 +971,7 @@ updateLocalStateOfRemoteConv rcu con = do unless allUsersArePresent $ P.warn $ - Log.field "conversation" (toByteString' (F.cuConvId cu)) + Log.field "conversation" (toByteString' cu.convId) . Log.field "domain" (toByteString' (tDomain rcu)) . Log.msg ( "Attempt to send notification about conversation update \ @@ -986,7 +981,7 @@ updateLocalStateOfRemoteConv rcu con = do -- Send notifications for mActualAction $ \(SomeConversationAction tag action) -> do - let event = conversationActionToEvent tag (F.cuTime cu) (F.cuOrigUserId cu) qconvId Nothing action + let event = conversationActionToEvent tag cu.time cu.origUserId qconvId Nothing action targets = nubOrd $ presentUsers <> extraTargets -- FUTUREWORK: support bots? pushConversationEvent con event (qualifyAs loc targets) [] $> event diff --git a/services/galley/src/Galley/API/Clients.hs b/services/galley/src/Galley/API/Clients.hs index 9ae38817dc8..cfb18cd320a 100644 --- a/services/galley/src/Galley/API/Clients.hs +++ b/services/galley/src/Galley/API/Clients.hs @@ -50,6 +50,7 @@ import Polysemy.TinyLog qualified as P import Wire.API.Conversation hiding (Member) import Wire.API.Federation.API import Wire.API.Federation.API.Galley +import Wire.API.Federation.Error import Wire.API.Routes.MultiTablePaging import Wire.NotificationSubsystem import Wire.Sem.Paging.Cassandra (CassandraPaging) @@ -91,23 +92,22 @@ addClientH (usr ::: clt) = do rmClientH :: forall p1 r. ( p1 ~ CassandraPaging, - ( Member ClientStore r, - Member ConversationStore r, - Member ExternalAccess r, - Member BackendNotificationQueueAccess r, - Member FederatorAccess r, - Member NotificationSubsystem r, - Member (Input Env) r, - Member (Input (Local ())) r, - Member (Input UTCTime) r, - Member (ListItems p1 ConvId) r, - Member (ListItems p1 (Remote ConvId)) r, - Member MemberStore r, - Member (Error InternalError) r, - Member ProposalStore r, - Member SubConversationStore r, - Member P.TinyLog r - ) + Member ClientStore r, + Member ConversationStore r, + Member (Error FederationError) r, + Member ExternalAccess r, + Member BackendNotificationQueueAccess r, + Member NotificationSubsystem r, + Member (Input Env) r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ListItems p1 ConvId) r, + Member (ListItems p1 (Remote ConvId)) r, + Member MemberStore r, + Member (Error InternalError) r, + Member ProposalStore r, + Member SubConversationStore r, + Member P.TinyLog r ) => UserId ::: ClientId -> Sem r Response @@ -138,5 +138,8 @@ rmClientH (usr ::: cid) = do removeRemoteMLSClients :: Range 1 1000 [Remote ConvId] -> Sem r () removeRemoteMLSClients convIds = do for_ (bucketRemote (fromRange convIds)) $ \remoteConvs -> - let rpc = void $ fedQueueClient @'OnClientRemovedTag (ClientRemovedRequest usr cid (tUnqualified remoteConvs)) - in enqueueNotification remoteConvs Q.Persistent rpc + let rpc = + fedQueueClient + @'OnClientRemovedTag + (ClientRemovedRequest usr cid (tUnqualified remoteConvs)) + in enqueueNotification Q.Persistent remoteConvs rpc diff --git a/services/galley/src/Galley/API/Create.hs b/services/galley/src/Galley/API/Create.hs index f255c0d9658..837a79dfd7e 100644 --- a/services/galley/src/Galley/API/Create.hs +++ b/services/galley/src/Galley/API/Create.hs @@ -91,7 +91,8 @@ import Wire.NotificationSubsystem -- | The public-facing endpoint for creating group conversations in the client -- API up to and including version 3. createGroupConversationUpToV3 :: - ( Member BrigAccess r, + ( Member BackendNotificationQueueAccess r, + Member BrigAccess r, Member ConversationStore r, Member (ErrorS 'ConvAccessDenied) r, Member (Error FederationError) r, @@ -129,7 +130,8 @@ createGroupConversationUpToV3 lusr conn newConv = mapError UnreachableBackendsLe -- | The public-facing endpoint for creating group conversations in the client -- API in version 4 and above. createGroupConversation :: - ( Member BrigAccess r, + ( Member BackendNotificationQueueAccess r, + Member BrigAccess r, Member ConversationStore r, Member (ErrorS 'ConvAccessDenied) r, Member (Error FederationError) r, @@ -169,7 +171,8 @@ createGroupConversation lusr conn newConv = do CreateGroupConversation conv mempty createGroupConversationGeneric :: - ( Member BrigAccess r, + ( Member BackendNotificationQueueAccess r, + Member BrigAccess r, Member ConversationStore r, Member (ErrorS 'ConvAccessDenied) r, Member (Error FederationError) r, @@ -309,7 +312,8 @@ createProteusSelfConversation lusr = do conversationCreated lusr c createOne2OneConversation :: - ( Member BrigAccess r, + ( Member BackendNotificationQueueAccess r, + Member BrigAccess r, Member ConversationStore r, Member (Error FederationError) r, Member (Error InternalError) r, @@ -386,7 +390,8 @@ createOne2OneConversation lusr zcon j = Nothing -> throwS @'TeamNotFound createLegacyOne2OneConversationUnchecked :: - ( Member ConversationStore r, + ( Member BackendNotificationQueueAccess r, + Member ConversationStore r, Member (Error FederationError) r, Member (Error InternalError) r, Member (Error InvalidInput) r, @@ -428,7 +433,8 @@ createLegacyOne2OneConversationUnchecked self zcon name mtid other = do Right () -> conversationCreated self c createOne2OneConversationUnchecked :: - ( Member ConversationStore r, + ( Member BackendNotificationQueueAccess r, + Member ConversationStore r, Member (Error FederationError) r, Member (Error InternalError) r, Member (Error UnreachableBackends) r, @@ -452,7 +458,8 @@ createOne2OneConversationUnchecked self zcon name mtid other = do create (one2OneConvId BaseProtocolProteusTag (tUntagged self) other) self zcon name mtid other createOne2OneConversationLocally :: - ( Member ConversationStore r, + ( Member BackendNotificationQueueAccess r, + Member ConversationStore r, Member (Error FederationError) r, Member (Error InternalError) r, Member (Error UnreachableBackends) r, @@ -502,7 +509,8 @@ createOne2OneConversationRemotely _ _ _ _ _ _ = throw FederationNotImplemented createConnectConversation :: - ( Member ConversationStore r, + ( Member BackendNotificationQueueAccess r, + Member ConversationStore r, Member (ErrorS 'ConvNotFound) r, Member (Error FederationError) r, Member (Error InternalError) r, @@ -654,6 +662,7 @@ notifyCreatedConversation :: Member (Error UnreachableBackends) r, Member FederatorAccess r, Member NotificationSubsystem r, + Member BackendNotificationQueueAccess r, Member (Input UTCTime) r, Member P.TinyLog r ) => diff --git a/services/galley/src/Galley/API/Federation.hs b/services/galley/src/Galley/API/Federation.hs index 7e292c55aab..6bee0e21f14 100644 --- a/services/galley/src/Galley/API/Federation.hs +++ b/services/galley/src/Galley/API/Federation.hs @@ -84,7 +84,9 @@ import Wire.API.Event.Conversation import Wire.API.Federation.API import Wire.API.Federation.API.Common (EmptyResponse (..)) import Wire.API.Federation.API.Galley +import Wire.API.Federation.Endpoint import Wire.API.Federation.Error +import Wire.API.Federation.Version import Wire.API.MLS.Credential import Wire.API.MLS.GroupInfo import Wire.API.MLS.Serialisation @@ -119,6 +121,7 @@ federationSitemap = :<|> Named @"on-client-removed" onClientRemoved :<|> Named @"on-message-sent" onMessageSent :<|> Named @"on-mls-message-sent" onMLSMessageSent + :<|> Named @(Versioned 'V0 "on-conversation-updated") onConversationUpdatedV0 :<|> Named @"on-conversation-updated" onConversationUpdated :<|> Named @"on-user-deleted-conversations" onUserDeleted @@ -126,6 +129,7 @@ onClientRemoved :: ( Member BackendNotificationQueueAccess r, Member ConversationStore r, Member ExternalAccess r, + Member (Error FederationError) r, Member NotificationSubsystem r, Member (Input Env) r, Member (Input (Local ())) r, @@ -225,6 +229,20 @@ onConversationUpdated requestingDomain cu = do void $ updateLocalStateOfRemoteConv rcu Nothing pure EmptyResponse +onConversationUpdatedV0 :: + ( Member BrigAccess r, + Member NotificationSubsystem r, + Member ExternalAccess r, + Member (Input (Local ())) r, + Member MemberStore r, + Member P.TinyLog r + ) => + Domain -> + ConversationUpdateV0 -> + Sem r EmptyResponse +onConversationUpdatedV0 domain cu = + onConversationUpdated domain (conversationUpdateFromV0 cu) + -- as of now this will not generate the necessary events on the leaver's domain leaveConversation :: ( Member BackendNotificationQueueAccess r, @@ -378,6 +396,7 @@ onUserDeleted :: ( Member BackendNotificationQueueAccess r, Member ConversationStore r, Member FireAndForget r, + Member (Error FederationError) r, Member ExternalAccess r, Member NotificationSubsystem r, Member (Input (Local ())) r, @@ -464,7 +483,7 @@ updateConversation origDomain updateRequest = do let rusr = toRemoteUnsafe origDomain updateRequest.user lcnv = qualifyAs loc updateRequest.convId - mkResponse $ case action updateRequest of + mkResponse $ case updateRequest.action of SomeConversationAction tag action -> case tag of SConversationJoinTag -> mapToGalleyError @(HasConversationActionGalleyErrors 'ConversationJoinTag) @@ -662,6 +681,7 @@ getSubConversationForRemoteUser domain GetSubConversationsRequest {..} = leaveSubConversation :: ( HasLeaveSubConversationEffects r, + Member (Error FederationError) r, Member (Input (Local ())) r, Member Resource r ) => diff --git a/services/galley/src/Galley/API/Internal.hs b/services/galley/src/Galley/API/Internal.hs index 2e4d7435980..15d0107d009 100644 --- a/services/galley/src/Galley/API/Internal.hs +++ b/services/galley/src/Galley/API/Internal.hs @@ -56,7 +56,6 @@ import Galley.Effects import Galley.Effects.BackendNotificationQueueAccess import Galley.Effects.ClientStore import Galley.Effects.ConversationStore -import Galley.Effects.FederatorAccess import Galley.Effects.LegalHoldStore as LegalHoldStore import Galley.Effects.MemberStore qualified as E import Galley.Effects.TeamStore @@ -303,9 +302,9 @@ rmUser :: Member ClientStore r, Member ConversationStore r, Member (Error DynError) r, + Member (Error FederationError) r, Member (Error InternalError) r, Member ExternalAccess r, - Member FederatorAccess r, Member NotificationSubsystem r, Member (Input Env) r, Member (Input Opts) r, @@ -411,39 +410,22 @@ rmUser lusr conn = do notifyRemoteMembers now qUser cid remotes = do let convUpdate = ConversationUpdate - { cuTime = now, - cuOrigUserId = qUser, - cuConvId = cid, - cuAlreadyPresentUsers = tUnqualified remotes, - cuAction = SomeConversationAction (sing @'ConversationLeaveTag) () + { time = now, + origUserId = qUser, + convId = cid, + alreadyPresentUsers = tUnqualified remotes, + action = SomeConversationAction (sing @'ConversationLeaveTag) () } - let rpc = fedClient @'Galley @"on-conversation-updated" convUpdate - runFederatedEither remotes rpc - >>= logAndIgnoreError "Error in onConversationUpdated call" (qUnqualified qUser) + enqueueNotification Q.Persistent remotes $ do + makeConversationUpdateBundle convUpdate + >>= sendBundle leaveRemoteConversations :: Range 1 UserDeletedNotificationMaxConvs [Remote ConvId] -> Sem r () leaveRemoteConversations cids = for_ (bucketRemote (fromRange cids)) $ \remoteConvs -> do let userDelete = UserDeletedConversationsNotification (tUnqualified lusr) (unsafeRange (tUnqualified remoteConvs)) - let rpc = void $ fedQueueClient @'OnUserDeletedConversationsTag userDelete - enqueueNotification remoteConvs Q.Persistent rpc - - -- FUTUREWORK: Add a retry mechanism if there are federation errrors. - -- See https://wearezeta.atlassian.net/browse/SQCORE-1091 - logAndIgnoreError :: Text -> UserId -> Either FederationError a -> Sem r () - logAndIgnoreError message usr res = do - case res of - Left federationError -> - P.err - ( Log.msg - ( "Federation error while notifying remote backends of a user deletion (Galley). " - <> message - <> " " - <> (cs . show $ federationError) - ) - . Log.field "user" (show usr) - ) - Right _ -> pure () + let rpc = fedQueueClient @'OnUserDeletedConversationsTag userDelete + enqueueNotification Q.Persistent remoteConvs rpc deleteLoop :: App () deleteLoop = do diff --git a/services/galley/src/Galley/API/MLS/Commit/ExternalCommit.hs b/services/galley/src/Galley/API/MLS/Commit/ExternalCommit.hs index 907e9ecb36d..484f5812332 100644 --- a/services/galley/src/Galley/API/MLS/Commit/ExternalCommit.hs +++ b/services/galley/src/Galley/API/MLS/Commit/ExternalCommit.hs @@ -41,6 +41,7 @@ import Polysemy.State import Wire.API.Conversation.Protocol import Wire.API.Error import Wire.API.Error.Galley +import Wire.API.Federation.Error import Wire.API.MLS.Commit import Wire.API.MLS.Credential import Wire.API.MLS.LeafNode @@ -121,7 +122,8 @@ getExternalCommitData senderIdentity lConvOrSub epoch commit = do processExternalCommit :: forall r. - ( Member (ErrorS 'MLSStaleMessage) r, + ( Member (Error FederationError) r, + Member (ErrorS 'MLSStaleMessage) r, Member (ErrorS 'MLSSubConvClientNotInParent) r, Member Resource r, HasProposalActionEffects r diff --git a/services/galley/src/Galley/API/MLS/Message.hs b/services/galley/src/Galley/API/MLS/Message.hs index 3afffb4d0a3..e49bd404de0 100644 --- a/services/galley/src/Galley/API/MLS/Message.hs +++ b/services/galley/src/Galley/API/MLS/Message.hs @@ -115,7 +115,6 @@ type MLSBundleStaticErrors = postMLSMessageFromLocalUser :: ( HasProposalEffects r, - Member (Error FederationError) r, Member (ErrorS 'ConvAccessDenied) r, Member (ErrorS 'ConvMemberNotFound) r, Member (ErrorS 'ConvNotFound) r, @@ -149,7 +148,6 @@ postMLSMessageFromLocalUser lusr c conn smsg = do postMLSCommitBundle :: ( HasProposalEffects r, Members MLSBundleStaticErrors r, - Member (Error FederationError) r, Member Resource r, Member SubConversationStore r ) => @@ -171,7 +169,6 @@ postMLSCommitBundle loc qusr c ctype qConvOrSub conn bundle = postMLSCommitBundleFromLocalUser :: ( HasProposalEffects r, Members MLSBundleStaticErrors r, - Member (Error FederationError) r, Member Resource r, Member SubConversationStore r ) => @@ -318,7 +315,6 @@ postMLSCommitBundleToRemoteConv loc qusr c con bundle ctype rConvOrSubId = do postMLSMessage :: ( HasProposalEffects r, - Member (Error FederationError) r, Member (ErrorS 'ConvAccessDenied) r, Member (ErrorS 'ConvMemberNotFound) r, Member (ErrorS 'ConvNotFound) r, @@ -417,7 +413,6 @@ postMLSMessageToLocalConv qusr c con msg ctype convOrSubId = do postMLSMessageToRemoteConv :: ( Members MLSMessageStaticErrors r, - Member (Error FederationError) r, HasProposalEffects r ) => Local x -> diff --git a/services/galley/src/Galley/API/MLS/Propagate.hs b/services/galley/src/Galley/API/MLS/Propagate.hs index 53efadec2dc..b0fe16e6c8c 100644 --- a/services/galley/src/Galley/API/MLS/Propagate.hs +++ b/services/galley/src/Galley/API/MLS/Propagate.hs @@ -27,7 +27,6 @@ import Data.Qualified import Data.Time import Galley.API.MLS.Types import Galley.API.Push -import Galley.API.Util import Galley.Data.Services import Galley.Effects import Galley.Effects.BackendNotificationQueueAccess @@ -36,11 +35,13 @@ import Gundeck.Types.Push.V2 (RecipientClients (..)) import Imports import Network.AMQP qualified as Q import Polysemy +import Polysemy.Error import Polysemy.Input import Polysemy.TinyLog hiding (trace) import Wire.API.Event.Conversation import Wire.API.Federation.API import Wire.API.Federation.API.Galley +import Wire.API.Federation.Error import Wire.API.MLS.Credential import Wire.API.MLS.Message import Wire.API.MLS.Serialisation @@ -53,6 +54,7 @@ import Wire.NotificationSubsystem -- a requirement from Core Crypto and the clients. propagateMessage :: ( Member BackendNotificationQueueAccess r, + Member (Error FederationError) r, Member ExternalAccess r, Member (Input UTCTime) r, Member TinyLog r, @@ -87,21 +89,23 @@ propagateMessage qusr mSenderClient lConvOrSub con msg cm = do newMessagePush botMap con mm (lmems >>= toList . localMemberRecipient mlsConv) e -- send to remotes - (either (logRemoteNotificationError @"on-mls-message-sent") (const (pure ())) <=< enqueueNotificationsConcurrently Q.Persistent (map remoteMemberQualify rmems)) $ - \rs -> - fedQueueClient @'OnMLSMessageSentTag $ - RemoteMLSMessage - { time = now, - sender = qusr, - metadata = mm, - conversation = qUnqualified qcnv, - subConversation = sconv, - recipients = - Map.fromList $ - tUnqualified rs - >>= toList . remoteMemberMLSClients, - message = Base64ByteString msg.raw - } + void $ + enqueueNotificationsConcurrently Q.Persistent (map remoteMemberQualify rmems) $ + \rs -> + fedQueueClient + @'OnMLSMessageSentTag + RemoteMLSMessage + { time = now, + sender = qusr, + metadata = mm, + conversation = qUnqualified qcnv, + subConversation = sconv, + recipients = + Map.fromList $ + tUnqualified rs + >>= toList . remoteMemberMLSClients, + message = Base64ByteString msg.raw + } where cmWithoutSender = maybe cm (flip cmRemoveClient cm . mkClientIdentity qusr) mSenderClient diff --git a/services/galley/src/Galley/API/MLS/Proposal.hs b/services/galley/src/Galley/API/MLS/Proposal.hs index 39d56406b4c..9047db2a946 100644 --- a/services/galley/src/Galley/API/MLS/Proposal.hs +++ b/services/galley/src/Galley/API/MLS/Proposal.hs @@ -58,6 +58,7 @@ import Wire.API.Conversation hiding (Member) import Wire.API.Conversation.Protocol import Wire.API.Error import Wire.API.Error.Galley +import Wire.API.Federation.Error import Wire.API.MLS.AuthenticatedContent import Wire.API.MLS.Credential import Wire.API.MLS.KeyPackage @@ -116,6 +117,7 @@ type HasProposalEffects r = Member ConversationStore r, Member NotificationSubsystem r, Member (Error InternalError) r, + Member (Error FederationError) r, Member (Error MLSProposalFailure) r, Member (Error MLSProtocolError) r, Member (ErrorS 'MLSClientMismatch) r, diff --git a/services/galley/src/Galley/API/MLS/Removal.hs b/services/galley/src/Galley/API/MLS/Removal.hs index f48631e7d23..f8491cdba47 100644 --- a/services/galley/src/Galley/API/MLS/Removal.hs +++ b/services/galley/src/Galley/API/MLS/Removal.hs @@ -44,10 +44,12 @@ import Galley.Env import Galley.Types.Conversations.Members import Imports hiding (cs) import Polysemy +import Polysemy.Error import Polysemy.Input import Polysemy.TinyLog import System.Logger qualified as Log import Wire.API.Conversation.Protocol +import Wire.API.Federation.Error import Wire.API.MLS.AuthenticatedContent import Wire.API.MLS.Credential import Wire.API.MLS.LeafNode @@ -59,7 +61,8 @@ import Wire.NotificationSubsystem -- | Send remove proposals for a set of clients to clients in the ClientMap. createAndSendRemoveProposals :: - ( Member (Input UTCTime) r, + ( Member (Error FederationError) r, + Member (Input UTCTime) r, Member TinyLog r, Member BackendNotificationQueueAccess r, Member ExternalAccess r, @@ -106,7 +109,8 @@ createAndSendRemoveProposals lConvOrSubConv indices qusr cm = do propagateMessage qusr Nothing lConvOrSubConv Nothing msg cm removeClientsWithClientMapRecursively :: - ( Member (Input UTCTime) r, + ( Member (Error FederationError) r, + Member (Input UTCTime) r, Member TinyLog r, Member BackendNotificationQueueAccess r, Member ExternalAccess r, @@ -138,7 +142,8 @@ removeClientsWithClientMapRecursively lMlsConv getClients qusr = do removeClientsFromSubConvs lMlsConv getClients qusr removeClientsFromSubConvs :: - ( Member (Input UTCTime) r, + ( Member (Error FederationError) r, + Member (Input UTCTime) r, Member TinyLog r, Member BackendNotificationQueueAccess r, Member ExternalAccess r, @@ -177,6 +182,7 @@ removeClientsFromSubConvs lMlsConv getClients qusr = do -- | Send remove proposals for a single client of a user to the local conversation. removeClient :: ( Member BackendNotificationQueueAccess r, + Member (Error FederationError) r, Member ExternalAccess r, Member NotificationSubsystem r, Member (Input Env) r, @@ -212,6 +218,7 @@ data RemoveUserIncludeMain -- | Send remove proposals for all clients of the user to the local conversation. removeUser :: ( Member BackendNotificationQueueAccess r, + Member (Error FederationError) r, Member ExternalAccess r, Member NotificationSubsystem r, Member (Input Env) r, @@ -257,6 +264,7 @@ listSubConversations' cid = do -- | Send remove proposals for clients of users that are not part of a conversation removeExtraneousClients :: ( Member BackendNotificationQueueAccess r, + Member (Error FederationError) r, Member ExternalAccess r, Member NotificationSubsystem r, Member (Input Env) r, diff --git a/services/galley/src/Galley/API/MLS/SubConversation.hs b/services/galley/src/Galley/API/MLS/SubConversation.hs index 7841a718396..ba13a24757b 100644 --- a/services/galley/src/Galley/API/MLS/SubConversation.hs +++ b/services/galley/src/Galley/API/MLS/SubConversation.hs @@ -377,6 +377,7 @@ leaveLocalSubConversation :: Member (Error MLSProtocolError) r, Member (ErrorS 'MLSStaleMessage) r, Member (ErrorS 'MLSNotEnabled) r, + Member (Error FederationError) r, Member Resource r, Members LeaveSubConversationStaticErrors r ) => diff --git a/services/galley/src/Galley/API/Message.hs b/services/galley/src/Galley/API/Message.hs index 355fccd7942..b436ea62250 100644 --- a/services/galley/src/Galley/API/Message.hs +++ b/services/galley/src/Galley/API/Message.hs @@ -680,26 +680,28 @@ sendRemoteMessages :: MessageMetadata -> Map (UserId, ClientId) Text -> Sem r (Set (UserId, ClientId)) -sendRemoteMessages domain now sender senderClient lcnv metadata messages = (handle =<<) $ do - let rcpts = - foldr - (\((u, c), t) -> Map.insertWith (<>) u (Map.singleton c t)) - mempty - (Map.assocs messages) - rm = - RemoteMessage - { time = now, - _data = mmData metadata, - sender = sender, - senderClient = senderClient, - conversation = tUnqualified lcnv, - priority = mmNativePriority metadata, - push = mmNativePush metadata, - transient = mmTransient metadata, - recipients = UserClientMap rcpts - } - let rpc = void $ fedQueueClient @'OnMessageSentTag rm - enqueueNotification domain Q.Persistent rpc +sendRemoteMessages domain now sender senderClient lcnv metadata messages = + -- FUTUREWORK: a FederationError here just means that queueing did not work. + -- It should not result in clients ending up in failedToSend. + (handle <=< runError) $ do + let rcpts = + foldr + (\((u, c), t) -> Map.insertWith (<>) u (Map.singleton c t)) + mempty + (Map.assocs messages) + rm = + RemoteMessage + { time = now, + _data = mmData metadata, + sender = sender, + senderClient = senderClient, + conversation = tUnqualified lcnv, + priority = mmNativePriority metadata, + push = mmNativePush metadata, + transient = mmTransient metadata, + recipients = UserClientMap rcpts + } + enqueueNotification Q.Persistent domain (fedQueueClient @'OnMessageSentTag rm) where handle :: Either FederationError a -> Sem r (Set (UserId, ClientId)) handle (Right _) = pure mempty diff --git a/services/galley/src/Galley/API/Teams.hs b/services/galley/src/Galley/API/Teams.hs index 95272e6a832..f8a44b29b0a 100644 --- a/services/galley/src/Galley/API/Teams.hs +++ b/services/galley/src/Galley/API/Teams.hs @@ -122,7 +122,6 @@ import Polysemy.Input import Polysemy.Output import Polysemy.TinyLog qualified as P import SAML2.WebSSO qualified as SAML -import System.Logger (Msg) import System.Logger qualified as Log import Wire.API.Conversation (ConversationRemoveMembers (..)) import Wire.API.Conversation.Role (wireConvRoles) @@ -885,6 +884,7 @@ deleteTeamMember :: Member BrigAccess r, Member ConversationStore r, Member (Error AuthenticationError) r, + Member (Error FederationError) r, Member (Error InvalidInput) r, Member (ErrorS 'AccessDenied) r, Member (ErrorS 'TeamMemberNotFound) r, @@ -913,6 +913,7 @@ deleteNonBindingTeamMember :: Member BrigAccess r, Member ConversationStore r, Member (Error AuthenticationError) r, + Member (Error FederationError) r, Member (Error InvalidInput) r, Member (ErrorS 'AccessDenied) r, Member (ErrorS 'TeamMemberNotFound) r, @@ -942,6 +943,7 @@ deleteTeamMember' :: Member ConversationStore r, Member (Error AuthenticationError) r, Member (Error InvalidInput) r, + Member (Error FederationError) r, Member (ErrorS 'AccessDenied) r, Member (ErrorS 'TeamMemberNotFound) r, Member (ErrorS 'TeamNotFound) r, @@ -1009,9 +1011,9 @@ uncheckedDeleteTeamMember :: ( Member BackendNotificationQueueAccess r, Member ConversationStore r, Member NotificationSubsystem r, + Member (Error FederationError) r, Member ExternalAccess r, Member (Input UTCTime) r, - Member (P.Logger (Log.Msg -> Log.Msg)) r, Member MemberStore r, Member TeamStore r ) => @@ -1059,10 +1061,10 @@ removeFromConvsAndPushConvLeaveEvent :: forall r. ( Member BackendNotificationQueueAccess r, Member ConversationStore r, + Member (Error FederationError) r, Member ExternalAccess r, Member NotificationSubsystem r, Member (Input UTCTime) r, - Member (P.Logger (Log.Msg -> Log.Msg)) r, Member MemberStore r, Member TeamStore r ) => @@ -1149,8 +1151,7 @@ deleteTeamConversation :: Member NotificationSubsystem r, Member (Input UTCTime) r, Member SubConversationStore r, - Member TeamStore r, - Member (P.Logger (Msg -> Msg)) r + Member TeamStore r ) => Local UserId -> ConnId -> diff --git a/services/galley/src/Galley/API/Update.hs b/services/galley/src/Galley/API/Update.hs index ddc89b92164..809f0376188 100644 --- a/services/galley/src/Galley/API/Update.hs +++ b/services/galley/src/Galley/API/Update.hs @@ -120,7 +120,6 @@ import Polysemy import Polysemy.Error import Polysemy.Input import Polysemy.TinyLog -import System.Logger (Msg) import Wire.API.Conversation hiding (Member) import Wire.API.Conversation.Action import Wire.API.Conversation.Code @@ -400,8 +399,7 @@ updateConversationMessageTimer :: Member (Error FederationError) r, Member ExternalAccess r, Member NotificationSubsystem r, - Member (Input UTCTime) r, - Member (Logger (Msg -> Msg)) r + Member (Input UTCTime) r ) => Local UserId -> ConnId -> @@ -433,8 +431,7 @@ updateConversationMessageTimerUnqualified :: Member (Error FederationError) r, Member ExternalAccess r, Member NotificationSubsystem r, - Member (Input UTCTime) r, - Member (Logger (Msg -> Msg)) r + Member (Input UTCTime) r ) => Local UserId -> ConnId -> @@ -460,8 +457,7 @@ deleteLocalConversation :: Member MemberStore r, Member ProposalStore r, Member (Input UTCTime) r, - Member TeamStore r, - Member (Logger (Msg -> Msg)) r + Member TeamStore r ) => Local UserId -> ConnId -> @@ -723,6 +719,7 @@ joinConversationByReusableCode :: Member BrigAccess r, Member CodeStore r, Member ConversationStore r, + Member (Error FederationError) r, Member (ErrorS 'CodeNotFound) r, Member (ErrorS 'InvalidConversationPassword) r, Member (ErrorS 'ConvAccessDenied) r, @@ -737,8 +734,7 @@ joinConversationByReusableCode :: Member (Input UTCTime) r, Member MemberStore r, Member TeamStore r, - Member TeamFeatureStore r, - Member (Logger (Msg -> Msg)) r + Member TeamFeatureStore r ) => Local UserId -> ConnId -> @@ -755,6 +751,7 @@ joinConversationById :: ( Member BackendNotificationQueueAccess r, Member BrigAccess r, Member ConversationStore r, + Member (Error FederationError) r, Member (ErrorS 'ConvAccessDenied) r, Member (ErrorS 'ConvNotFound) r, Member (ErrorS 'InvalidOperation) r, @@ -765,8 +762,7 @@ joinConversationById :: Member (Input Opts) r, Member (Input UTCTime) r, Member MemberStore r, - Member TeamStore r, - Member (Logger (Msg -> Msg)) r + Member TeamStore r ) => Local UserId -> ConnId -> @@ -780,6 +776,7 @@ joinConversation :: forall r. ( Member BackendNotificationQueueAccess r, Member BrigAccess r, + Member (Error FederationError) r, Member (ErrorS 'ConvAccessDenied) r, Member (ErrorS 'InvalidOperation) r, Member (ErrorS 'NotATeamMember) r, @@ -789,8 +786,7 @@ joinConversation :: Member (Input Opts) r, Member (Input UTCTime) r, Member MemberStore r, - Member TeamStore r, - Member (Logger (Msg -> Msg)) r + Member TeamStore r ) => Local UserId -> ConnId -> @@ -1017,8 +1013,7 @@ updateOtherMemberLocalConv :: Member ExternalAccess r, Member NotificationSubsystem r, Member (Input UTCTime) r, - Member MemberStore r, - Member (Logger (Msg -> Msg)) r + Member MemberStore r ) => Local ConvId -> Local UserId -> @@ -1044,8 +1039,7 @@ updateOtherMemberUnqualified :: Member ExternalAccess r, Member NotificationSubsystem r, Member (Input UTCTime) r, - Member MemberStore r, - Member (Logger (Msg -> Msg)) r + Member MemberStore r ) => Local UserId -> ConnId -> @@ -1070,8 +1064,7 @@ updateOtherMember :: Member ExternalAccess r, Member NotificationSubsystem r, Member (Input UTCTime) r, - Member MemberStore r, - Member (Logger (Msg -> Msg)) r + Member MemberStore r ) => Local UserId -> ConnId -> @@ -1402,7 +1395,6 @@ updateConversationName :: Member ExternalAccess r, Member NotificationSubsystem r, Member (Input UTCTime) r, - Member (Logger (Msg -> Msg)) r, Member TeamStore r ) => Local UserId -> @@ -1429,7 +1421,6 @@ updateUnqualifiedConversationName :: Member ExternalAccess r, Member NotificationSubsystem r, Member (Input UTCTime) r, - Member (Logger (Msg -> Msg)) r, Member TeamStore r ) => Local UserId -> @@ -1452,7 +1443,6 @@ updateLocalConversationName :: Member ExternalAccess r, Member NotificationSubsystem r, Member (Input UTCTime) r, - Member (Logger (Msg -> Msg)) r, Member TeamStore r ) => Local UserId -> diff --git a/services/galley/src/Galley/API/Util.hs b/services/galley/src/Galley/API/Util.hs index b4759ef59e2..0c8d4df22fb 100644 --- a/services/galley/src/Galley/API/Util.hs +++ b/services/galley/src/Galley/API/Util.hs @@ -45,6 +45,7 @@ import Galley.Data.Conversation qualified as Data import Galley.Data.Services (BotMember, newBotMember) import Galley.Data.Types qualified as DataTypes import Galley.Effects +import Galley.Effects.BackendNotificationQueueAccess import Galley.Effects.BrigAccess import Galley.Effects.CodeStore import Galley.Effects.ConversationStore @@ -60,6 +61,7 @@ import Galley.Types.Teams import Galley.Types.UserList import Gundeck.Types.Push.V2 qualified as PushV2 import Imports hiding (forkIO) +import Network.AMQP qualified as Q import Network.HTTP.Types import Network.Wai import Network.Wai.Predicate hiding (Error, fromEither) @@ -823,12 +825,12 @@ ensureNoUnreachableBackends results = do throw (UnreachableBackends (map (tDomain . fst) errors)) pure values --- | Notify remote users of being added to a new conversation. In case a remote --- domain is unreachable, an exception is thrown, the conversation deleted and --- the client gets an error response. +-- | Notify remote users of being added to a new conversation. registerRemoteConversationMemberships :: ( Member ConversationStore r, Member (Error UnreachableBackends) r, + Member (Error FederationError) r, + Member BackendNotificationQueueAccess r, Member FederatorAccess r ) => -- | The time stamp when the conversation was created @@ -861,6 +863,7 @@ registerRemoteConversationMemberships now lusr lc = deleteOnUnreachable $ do -- reachable members in buckets per remote domain let joined :: [Remote [RemoteMember]] = allRemoteBuckets + joinedCoupled :: [Remote ([RemoteMember], NonEmpty (Remote UserId))] joinedCoupled = foldMap ( \ruids -> @@ -869,14 +872,12 @@ registerRemoteConversationMemberships now lusr lc = deleteOnUnreachable $ do filter (\r -> tDomain r /= tDomain ruids) joined in case NE.nonEmpty nj of Nothing -> [] - Just v -> [(ruids, v)] + Just v -> [fmap (,v) ruids] ) joined - void . (ensureNoUnreachableBackends =<<) $ - -- Send an update to remotes about the final list of participants - runFederatedConcurrentlyBucketsEither joinedCoupled $ - fedClient @'Galley @"on-conversation-updated" . convUpdateJoin + void $ enqueueNotificationsConcurrentlyBuckets Q.Persistent joinedCoupled $ \z -> + makeConversationUpdateBundle (convUpdateJoin z) >>= sendBundle where creator :: Maybe UserId creator = cnvmCreator . DataTypes.convMetadata . tUnqualified $ lc @@ -893,14 +894,14 @@ registerRemoteConversationMemberships now lusr lc = deleteOnUnreachable $ do toMembers :: [RemoteMember] -> Set OtherMember toMembers rs = Set.fromList $ localNonCreators <> fmap remoteMemberToOther rs - convUpdateJoin :: (QualifiedWithTag t [RemoteMember], NonEmpty (QualifiedWithTag t' UserId)) -> ConversationUpdate - convUpdateJoin (toNotify, newMembers) = + convUpdateJoin :: Remote ([RemoteMember], NonEmpty (Remote UserId)) -> ConversationUpdate + convUpdateJoin (tUnqualified -> (toNotify, newMembers)) = ConversationUpdate - { cuTime = now, - cuOrigUserId = tUntagged lusr, - cuConvId = DataTypes.convId (tUnqualified lc), - cuAlreadyPresentUsers = fmap (tUnqualified . rmId) . tUnqualified $ toNotify, - cuAction = + { time = now, + origUserId = tUntagged lusr, + convId = DataTypes.convId (tUnqualified lc), + alreadyPresentUsers = fmap (tUnqualified . rmId) toNotify, + action = SomeConversationAction (sing @'ConversationJoinTag) -- FUTUREWORK(md): replace the member role with whatever is provided in diff --git a/services/galley/src/Galley/Effects/BackendNotificationQueueAccess.hs b/services/galley/src/Galley/Effects/BackendNotificationQueueAccess.hs index bdefa146314..9c2fe5d4004 100644 --- a/services/galley/src/Galley/Effects/BackendNotificationQueueAccess.hs +++ b/services/galley/src/Galley/Effects/BackendNotificationQueueAccess.hs @@ -1,11 +1,10 @@ -{-# LANGUAGE TemplateHaskell #-} - module Galley.Effects.BackendNotificationQueueAccess where import Data.Qualified import Imports import Network.AMQP qualified as Q import Polysemy +import Polysemy.Error import Wire.API.Federation.BackendNotifications import Wire.API.Federation.Component import Wire.API.Federation.Error @@ -13,8 +12,8 @@ import Wire.API.Federation.Error data BackendNotificationQueueAccess m a where EnqueueNotification :: KnownComponent c => - Remote x -> Q.DeliveryMode -> + Remote x -> FedQueueClient c a -> BackendNotificationQueueAccess m (Either FederationError a) EnqueueNotificationsConcurrently :: @@ -23,5 +22,49 @@ data BackendNotificationQueueAccess m a where f (Remote x) -> (Remote [x] -> FedQueueClient c a) -> BackendNotificationQueueAccess m (Either FederationError [Remote a]) + EnqueueNotificationsConcurrentlyBuckets :: + (KnownComponent c, Foldable f, Functor f) => + Q.DeliveryMode -> + f (Remote x) -> + (Remote x -> FedQueueClient c a) -> + BackendNotificationQueueAccess m (Either FederationError [Remote a]) + +enqueueNotification :: + ( KnownComponent c, + Member (Error FederationError) r, + Member BackendNotificationQueueAccess r + ) => + Q.DeliveryMode -> + Remote x -> + FedQueueClient c a -> + Sem r a +enqueueNotification m r q = send (EnqueueNotification m r q) >>= either throw pure + +enqueueNotificationsConcurrently :: + ( KnownComponent c, + Foldable f, + Functor f, + Member (Error FederationError) r, + Member BackendNotificationQueueAccess r + ) => + Q.DeliveryMode -> + f (Remote x) -> + (Remote [x] -> FedQueueClient c a) -> + Sem r [Remote a] +enqueueNotificationsConcurrently m r q = + send (EnqueueNotificationsConcurrently m r q) + >>= either throw pure -makeSem ''BackendNotificationQueueAccess +enqueueNotificationsConcurrentlyBuckets :: + ( KnownComponent c, + Foldable f, + Functor f, + Member (Error FederationError) r, + Member BackendNotificationQueueAccess r + ) => + Q.DeliveryMode -> + f (Remote x) -> + (Remote x -> FedQueueClient c a) -> + Sem r [Remote a] +enqueueNotificationsConcurrentlyBuckets m r q = + send (EnqueueNotificationsConcurrentlyBuckets m r q) >>= either throw pure diff --git a/services/galley/src/Galley/Effects/FederatorAccess.hs b/services/galley/src/Galley/Effects/FederatorAccess.hs index 8afd28cb842..cfa3b508c76 100644 --- a/services/galley/src/Galley/Effects/FederatorAccess.hs +++ b/services/galley/src/Galley/Effects/FederatorAccess.hs @@ -63,11 +63,11 @@ data FederatorAccess m a where -- already in buckets. The buckets are paired with arbitrary data that affect -- the payload of the request for each remote backend. RunFederatedConcurrentlyBucketsEither :: - forall (c :: Component) a m x y. - (KnownComponent c) => - [(Remote [x], y)] -> - ((Remote [x], y) -> FederatorClient c a) -> - FederatorAccess m [Either (Remote [x], FederationError) (Remote a)] + forall (c :: Component) f a m x. + (KnownComponent c, Foldable f) => + f (Remote x) -> + (Remote x -> FederatorClient c a) -> + FederatorAccess m [Either (Remote x, FederationError) (Remote a)] IsFederationConfigured :: FederatorAccess m Bool makeSem ''FederatorAccess diff --git a/services/galley/src/Galley/Intra/BackendNotificationQueue.hs b/services/galley/src/Galley/Intra/BackendNotificationQueue.hs index 316a94dcce7..cefe3cdc1e4 100644 --- a/services/galley/src/Galley/Intra/BackendNotificationQueue.hs +++ b/services/galley/src/Galley/Intra/BackendNotificationQueue.hs @@ -28,10 +28,12 @@ interpretBackendNotificationQueueAccess :: Sem (BackendNotificationQueueAccess ': r) a -> Sem r a interpretBackendNotificationQueueAccess = interpret $ \case - EnqueueNotification remote deliveryMode action -> do - embedApp . runExceptT $ enqueueNotification (tDomain remote) deliveryMode action + EnqueueNotification deliveryMode remote action -> do + embedApp . runExceptT $ enqueueNotification deliveryMode (tDomain remote) action EnqueueNotificationsConcurrently m xs rpc -> do embedApp . runExceptT $ enqueueNotificationsConcurrently m xs rpc + EnqueueNotificationsConcurrentlyBuckets m xs rpc -> do + embedApp . runExceptT $ enqueueNotificationsConcurrentlyBuckets m xs rpc getChannel :: ExceptT FederationError App (MVar Q.Channel) getChannel = view rabbitmqChannel >>= maybe (throwE FederationNotConfigured) pure @@ -61,8 +63,8 @@ enqueueSingleNotification remoteDomain deliveryMode chanVar action = do Just chan -> do liftIO $ enqueue chan rid ownDomain remoteDomain deliveryMode action -enqueueNotification :: Domain -> Q.DeliveryMode -> FedQueueClient c a -> ExceptT FederationError App a -enqueueNotification remoteDomain deliveryMode action = do +enqueueNotification :: Q.DeliveryMode -> Domain -> FedQueueClient c a -> ExceptT FederationError App a +enqueueNotification deliveryMode remoteDomain action = do chanVar <- getChannel lift $ enqueueSingleNotification remoteDomain deliveryMode chanVar action @@ -78,6 +80,18 @@ enqueueNotificationsConcurrently m xs f = do qualifyAs r <$> enqueueSingleNotification (tDomain r) m chanVar (f r) +enqueueNotificationsConcurrentlyBuckets :: + (Foldable f) => + Q.DeliveryMode -> + f (Remote x) -> + (Remote x -> FedQueueClient c a) -> + ExceptT FederationError App [Remote a] +enqueueNotificationsConcurrentlyBuckets m xs f = do + chanVar <- getChannel + lift $ pooledForConcurrentlyN 8 (toList xs) $ \r -> + qualifyAs r + <$> enqueueSingleNotification (tDomain r) m chanVar (f r) + data NoRabbitMqChannel = NoRabbitMqChannel deriving (Show) diff --git a/services/galley/src/Galley/Intra/Federator.hs b/services/galley/src/Galley/Intra/Federator.hs index 6e09422c98a..c1dd13bae16 100644 --- a/services/galley/src/Galley/Intra/Federator.hs +++ b/services/galley/src/Galley/Intra/Federator.hs @@ -102,9 +102,10 @@ runFederatedConcurrentlyEither xs rpc = bimap (r,) (qualifyAs r) <$> runFederatedEither r (rpc r) runFederatedConcurrentlyBucketsEither :: - [(Remote [a], y)] -> - ((Remote [a], y) -> FederatorClient c b) -> - App [Either (Remote [a], FederationError) (Remote b)] + Foldable f => + f (Remote x) -> + (Remote x -> FederatorClient c b) -> + App [Either (Remote x, FederationError) (Remote b)] runFederatedConcurrentlyBucketsEither xs rpc = - pooledForConcurrentlyN 8 xs $ \(r, v) -> - bimap (r,) (qualifyAs r) <$> runFederatedEither r (rpc (r, v)) + pooledForConcurrentlyN 8 (toList xs) $ \r -> + bimap (r,) (qualifyAs r) <$> runFederatedEither r (rpc r) diff --git a/services/galley/test/integration/API.hs b/services/galley/test/integration/API.hs index 2ac9f185e71..ccff779599f 100644 --- a/services/galley/test/integration/API.hs +++ b/services/galley/test/integration/API.hs @@ -132,7 +132,6 @@ tests s = test s "metrics" metrics, test s "fetch conversation by qualified ID (v2)" testGetConvQualifiedV2, test s "create Proteus conversation" postProteusConvOk, - test s "create conversation with remote users all reachable" (postConvWithRemoteUsersOk $ Set.fromList [rb1, rb2]), test s "create conversation with remote users some unreachable" (postConvWithUnreachableRemoteUsers $ Set.fromList [rb1, rb2, rb3, rb4]), test s "get empty conversations" getConvsOk, test s "get conversations by ids" getConvsOk2, @@ -242,7 +241,6 @@ tests s = test s "existing has password, requested has password - 409" postCodeWithPasswordExistsWithPasswordRequested ], test s "remove user with only local convs" removeUserNoFederation, - test s "remove user with local and remote convs" removeUser, test s "iUpsertOne2OneConversation" testAllOne2OneConversationRequests, test s "post message - reject if missing client" postMessageRejectIfMissingClients, test s "post message - client that is not in group doesn't receive message" postMessageClientNotInGroupDoesNotReceiveMsg, @@ -412,121 +410,6 @@ postConvWithUnreachableRemoteUsers rbs = do groupConvs WS.assertNoEvent (3 # Second) [wsAlice, wsAlex] -postConvWithRemoteUsersOk :: Set (Remote Backend) -> TestM () -postConvWithRemoteUsersOk rbs = do - c <- view tsCannon - (alice, qAlice) <- randomUserTuple - (alex, qAlex) <- randomUserTuple - (amy, qAmy) <- randomUserTuple - connectUsers alice (list1 alex [amy]) - (allRemotes, participatingRemotes) <- do - v <- forM (toList rbs) $ \rb -> do - users <- connectBackend alice rb - pure (users, participating rb users) - pure $ foldr (\(a, p) acc -> bimap ((<>) a) ((<>) p) acc) ([], []) v - liftIO $ - assertBool "Not every backend is reachable in the test" (allRemotes == participatingRemotes) - - let convName = "some chat" - otherLocals = [qAlex, qAmy] - WS.bracketR3 c alice alex amy $ \(wsAlice, wsAlex, wsAmy) -> do - let joiners = allRemotes <> otherLocals - unreachableBackends = - Set.fromList $ - foldMap - ( \rb -> - guard (rbReachable rb == BackendUnreachable) - $> tDomain rb - ) - rbs - (rsp, federatedRequests) <- - withTempMockFederator' - ( asum - [ getNotFullyConnectedBackendsMock, - mockUnreachableFor unreachableBackends, - "on-conversation-created" ~> EmptyResponse, - "on-conversation-updated" ~> EmptyResponse - ] - ) - $ postConvQualified - alice - Nothing - defNewProteusConv - { newConvName = checked convName, - newConvQualifiedUsers = joiners - } - minimalShouldBePresent) - qcid <- - assertConv - rsp - RegularConv - (Just alice) - qAlice - (otherLocals <> participatingRemotes) - (Just convName) - Nothing - let cid = qUnqualified qcid - cvs <- mapM (convView qcid) [alice, alex, amy] - liftIO $ - mapM_ WS.assertSuccess - =<< Async.mapConcurrently (checkWs qAlice) (zip cvs [wsAlice, wsAlex, wsAmy]) - - liftIO $ do - let expectedReqs = - Set.fromList $ - [ "on-conversation-created", - "on-conversation-updated" - ] - in assertBool "Some federated calls are missing" $ - expectedReqs `Set.isSubsetOf` Set.fromList (frRPC <$> federatedRequests) - - -- assertions on the conversation.create event triggering federation request - let fedReqsCreated = filter (\r -> frRPC r == "on-conversation-created") federatedRequests - fedReqCreatedBodies <- for fedReqsCreated $ assertRight . parseFedRequest - forM_ fedReqCreatedBodies $ \(fedReqCreatedBody :: ConversationCreated ConvId) -> liftIO $ do - fedReqCreatedBody.origUserId @?= alice - fedReqCreatedBody.cnvId @?= cid - fedReqCreatedBody.cnvType @?= RegularConv - fedReqCreatedBody.cnvAccess @?= [InviteAccess] - fedReqCreatedBody.cnvAccessRoles - @?= Set.fromList [TeamMemberAccessRole, NonTeamMemberAccessRole, ServiceAccessRole] - fedReqCreatedBody.cnvName @?= Just convName - assertBool "Notifying an incorrect set of conversation members" $ - minimalShouldBePresentSet `Set.isSubsetOf` fedReqCreatedBody.nonCreatorMembers - fedReqCreatedBody.messageTimer @?= Nothing - fedReqCreatedBody.receiptMode @?= Nothing - - -- assertions on the conversation.member-join event triggering federation request - let fedReqsAdd = filter (\r -> frRPC r == "on-conversation-updated") federatedRequests - fedReqAddBodies <- for fedReqsAdd $ assertRight . parseFedRequest - forM_ fedReqAddBodies $ \(fedReqAddBody :: ConversationUpdate) -> liftIO $ do - fedReqAddBody.cuOrigUserId @?= qAlice - fedReqAddBody.cuConvId @?= cid - -- This remote backend must already have their users in the conversation, - -- otherwise they should not be receiving the conversation update message - assertBool "The list of already present users should be non-empty" - . not - . null - $ fedReqAddBody.cuAlreadyPresentUsers - case fedReqAddBody.cuAction of - SomeConversationAction SConversationJoinTag _action -> pure () - _ -> assertFailure @() "Unexpected update action" - where - toOtherMember qid = OtherMember qid Nothing roleNameWireAdmin - convView cnv usr = - responseJsonError =<< getConvQualified usr cnv do - ntfTransient n @?= False - let e = List1.head (WS.unpackPayload n) - evtConv e @?= cnvQualifiedId cnv - evtType e @?= ConvCreate - evtFrom e @?= qalice - case evtData e of - EdConversation c' -> assertConvEquals cnv c' - _ -> assertFailure "Unexpected event data" - -- @SF.Separation @TSFI.RESTfulAPI @S2 -- This test verifies whether a message actually gets sent all the way to -- cannon. @@ -1867,11 +1750,11 @@ paginateConvListIds = do conv <- randomId let cu = ConversationUpdate - { cuTime = now, - cuOrigUserId = qChad, - cuConvId = conv, - cuAlreadyPresentUsers = [], - cuAction = SomeConversationAction (sing @'ConversationJoinTag) (ConversationJoin (pure qAlice) roleNameWireMember) + { time = now, + origUserId = qChad, + convId = conv, + alreadyPresentUsers = [], + action = SomeConversationAction (sing @'ConversationJoinTag) (ConversationJoin (pure qAlice) roleNameWireMember) } void $ runFedClient @"on-conversation-updated" fedGalleyClient chadDomain cu @@ -1883,11 +1766,11 @@ paginateConvListIds = do conv <- randomId let cu = ConversationUpdate - { cuTime = now, - cuOrigUserId = qDee, - cuConvId = conv, - cuAlreadyPresentUsers = [], - cuAction = SomeConversationAction (sing @'ConversationJoinTag) (ConversationJoin (pure qAlice) roleNameWireMember) + { time = now, + origUserId = qDee, + convId = conv, + alreadyPresentUsers = [], + action = SomeConversationAction (sing @'ConversationJoinTag) (ConversationJoin (pure qAlice) roleNameWireMember) } void $ runFedClient @"on-conversation-updated" fedGalleyClient deeDomain cu @@ -1928,11 +1811,11 @@ paginateConvListIdsPageEndingAtLocalsAndDomain = do conv <- randomId let cu = ConversationUpdate - { cuTime = now, - cuOrigUserId = qChad, - cuConvId = conv, - cuAlreadyPresentUsers = [], - cuAction = SomeConversationAction (sing @'ConversationJoinTag) (ConversationJoin (pure qAlice) roleNameWireMember) + { time = now, + origUserId = qChad, + convId = conv, + alreadyPresentUsers = [], + action = SomeConversationAction (sing @'ConversationJoinTag) (ConversationJoin (pure qAlice) roleNameWireMember) } void $ runFedClient @"on-conversation-updated" fedGalleyClient chadDomain cu @@ -1946,11 +1829,11 @@ paginateConvListIdsPageEndingAtLocalsAndDomain = do conv <- randomId let cu = ConversationUpdate - { cuTime = now, - cuOrigUserId = qDee, - cuConvId = conv, - cuAlreadyPresentUsers = [], - cuAction = SomeConversationAction (sing @'ConversationJoinTag) (ConversationJoin (pure qAlice) roleNameWireMember) + { time = now, + origUserId = qDee, + convId = conv, + alreadyPresentUsers = [], + action = SomeConversationAction (sing @'ConversationJoinTag) (ConversationJoin (pure qAlice) roleNameWireMember) } void $ runFedClient @"on-conversation-updated" fedGalleyClient deeDomain cu @@ -3204,11 +3087,11 @@ putRemoteConvMemberOk update = do now <- liftIO getCurrentTime let cu = ConversationUpdate - { cuTime = now, - cuOrigUserId = qbob, - cuConvId = qUnqualified qconv, - cuAlreadyPresentUsers = [], - cuAction = + { time = now, + origUserId = qbob, + convId = qUnqualified qconv, + alreadyPresentUsers = [], + action = SomeConversationAction (sing @'ConversationJoinTag) (ConversationJoin (pure qalice) roleNameWireMember) } void $ runFedClient @"on-conversation-updated" fedGalleyClient remoteDomain cu @@ -3349,11 +3232,11 @@ putRemoteReceiptModeOk = do now <- liftIO getCurrentTime let cuAddAlice = ConversationUpdate - { cuTime = now, - cuOrigUserId = qbob, - cuConvId = qUnqualified qconv, - cuAlreadyPresentUsers = [], - cuAction = + { time = now, + origUserId = qbob, + convId = qUnqualified qconv, + alreadyPresentUsers = [], + action = SomeConversationAction (sing @'ConversationJoinTag) (ConversationJoin (pure qalice) roleNameWireAdmin) } void $ runFedClient @"on-conversation-updated" fedGalleyClient remoteDomain cuAddAlice @@ -3364,11 +3247,11 @@ putRemoteReceiptModeOk = do connectWithRemoteUser adam qbob let cuAddAdam = ConversationUpdate - { cuTime = now, - cuOrigUserId = qbob, - cuConvId = qUnqualified qconv, - cuAlreadyPresentUsers = [], - cuAction = + { time = now, + origUserId = qbob, + convId = qUnqualified qconv, + alreadyPresentUsers = [], + action = SomeConversationAction (sing @'ConversationJoinTag) (ConversationJoin (pure qadam) roleNameWireMember) } void $ runFedClient @"on-conversation-updated" fedGalleyClient remoteDomain cuAddAdam @@ -3377,11 +3260,11 @@ putRemoteReceiptModeOk = do let action = ConversationReceiptModeUpdate newReceiptMode let responseConvUpdate = ConversationUpdate - { cuTime = now, - cuOrigUserId = qalice, - cuConvId = qUnqualified qconv, - cuAlreadyPresentUsers = [adam], - cuAction = + { time = now, + origUserId = qalice, + convId = qUnqualified qconv, + alreadyPresentUsers = [adam], + action = SomeConversationAction (sing @'ConversationReceiptModeUpdateTag) action } let mockResponse = mockReply (ConversationUpdateResponseUpdate responseConvUpdate) @@ -3553,137 +3436,6 @@ removeUserNoFederation = do (mems3 >>= other bob) @?= Nothing (mems3 >>= other carl) @?= Just (OtherMember carl Nothing roleNameWireAdmin) -removeUser :: TestM () -removeUser = do - c <- view tsCannon - [alice, alexDel, amy] <- replicateM 3 randomQualifiedUser - let [alice', alexDel', amy'] = qUnqualified <$> [alice, alexDel, amy] - - let bDomain = Domain "b.example.com" - bart <- randomQualifiedId bDomain - berta <- randomQualifiedId bDomain - - let cDomain = Domain "c.example.com" - carl <- randomQualifiedId cDomain - - let dDomain = Domain "d.example.com" - dwight <- randomQualifiedId dDomain - dory <- randomQualifiedId dDomain - - connectUsers alice' (list1 alexDel' [amy']) - connectWithRemoteUser alice' bart - connectWithRemoteUser alice' berta - connectWithRemoteUser alexDel' bart - connectWithRemoteUser alice' carl - connectWithRemoteUser alexDel' carl - connectWithRemoteUser alice' dwight - connectWithRemoteUser alexDel' dory - - qconvA1 <- decodeQualifiedConvId <$> postConv alice' [alexDel'] (Just "gossip") [] Nothing Nothing - qconvA2 <- decodeQualifiedConvId <$> postConvWithRemoteUsers alice' Nothing defNewProteusConv {newConvQualifiedUsers = [alexDel, amy, berta, dwight]} - qconvA3 <- decodeQualifiedConvId <$> postConv alice' [amy'] (Just "gossip3") [] Nothing Nothing - qconvA4 <- decodeQualifiedConvId <$> postConvWithRemoteUsers alice' Nothing defNewProteusConv {newConvQualifiedUsers = [alexDel, bart, carl]} - convB1 <- randomId -- a remote conversation at 'bDomain' that Alice, AlexDel and Bart will be in - convB2 <- randomId -- a remote conversation at 'bDomain' that AlexDel and Bart will be in - convC1 <- randomId -- a remote conversation at 'cDomain' that AlexDel and Carl will be in - convD1 <- randomId -- a remote conversation at 'cDomain' that AlexDel and Dory will be in - now <- liftIO getCurrentTime - fedGalleyClient <- view tsFedGalleyClient - let nc cid creator quids = - ConversationCreated - { time = now, - origUserId = qUnqualified creator, - cnvId = cid, - cnvType = RegularConv, - cnvAccess = [], - cnvAccessRoles = Set.fromList [], - cnvName = Just "gossip4", - nonCreatorMembers = Set.fromList $ createOtherMember <$> quids, - messageTimer = Nothing, - receiptMode = Nothing, - protocol = ProtocolProteus - } - void $ runFedClient @"on-conversation-created" fedGalleyClient bDomain $ nc convB1 bart [alice, alexDel] - void $ runFedClient @"on-conversation-created" fedGalleyClient bDomain $ nc convB2 bart [alexDel] - void $ runFedClient @"on-conversation-created" fedGalleyClient cDomain $ nc convC1 carl [alexDel] - void $ runFedClient @"on-conversation-created" fedGalleyClient dDomain $ nc convD1 dory [alexDel] - - WS.bracketR3 c alice' alexDel' amy' $ \(wsAlice, wsAlexDel, wsAmy) -> do - let handler = do - d <- frTargetDomain <$> getRequest - asum - [ do - guard (d == dDomain) - throw (DiscoveryFailureSrvNotAvailable "dDomain"), - do - guard (d `elem` [bDomain, cDomain]) - "leave-conversation" ~> LeaveConversationResponse (Right mempty) - ] - (_, fedRequests) <- - withTempMockFederator' handler $ - deleteUser alexDel' !!! const 200 === statusCode - - liftIO $ do - assertEqual ("expect exactly 4 federated requests in : " <> show fedRequests) 4 (length fedRequests) - - liftIO $ do - WS.assertMatchN_ (5 # Second) [wsAlice, wsAlexDel] $ - wsAssertMembersLeave qconvA1 alexDel [alexDel] - WS.assertMatchN_ (5 # Second) [wsAlice, wsAlexDel, wsAmy] $ - wsAssertMembersLeave qconvA2 alexDel [alexDel] - - liftIO $ do - let bConvUpdateRPCs = filter (matchFedRequest bDomain "on-conversation-updated") fedRequests - bConvUpdates <- mapM (assertRight . eitherDecode . frBody) bConvUpdateRPCs - - bConvUpdatesA2 <- assertOne $ filter (\cu -> cuConvId cu == qUnqualified qconvA2) bConvUpdates - cuOrigUserId bConvUpdatesA2 @?= alexDel - cuAction bConvUpdatesA2 @?= SomeConversationAction (sing @'ConversationLeaveTag) () - cuAlreadyPresentUsers bConvUpdatesA2 @?= [qUnqualified berta] - - bConvUpdatesA4 <- assertOne $ filter (\cu -> cuConvId cu == qUnqualified qconvA4) bConvUpdates - cuOrigUserId bConvUpdatesA4 @?= alexDel - cuAction bConvUpdatesA4 @?= SomeConversationAction (sing @'ConversationLeaveTag) () - cuAlreadyPresentUsers bConvUpdatesA4 @?= [qUnqualified bart] - - liftIO $ do - cConvUpdateRPC <- assertOne $ filter (matchFedRequest cDomain "on-conversation-updated") fedRequests - Right convUpdate <- pure . eitherDecode . frBody $ cConvUpdateRPC - cuConvId convUpdate @?= qUnqualified qconvA4 - cuOrigUserId convUpdate @?= alexDel - cuAction convUpdate @?= SomeConversationAction (sing @'ConversationLeaveTag) () - cuAlreadyPresentUsers convUpdate @?= [qUnqualified carl] - - liftIO $ do - dConvUpdateRPC <- assertOne $ filter (matchFedRequest dDomain "on-conversation-updated") fedRequests - Right convUpdate <- pure . eitherDecode . frBody $ dConvUpdateRPC - cuConvId convUpdate @?= qUnqualified qconvA2 - cuOrigUserId convUpdate @?= alexDel - cuAction convUpdate @?= SomeConversationAction (sing @'ConversationLeaveTag) () - cuAlreadyPresentUsers convUpdate @?= [qUnqualified dwight] - - -- Check memberships - mems1 <- fmap cnvMembers . responseJsonError =<< getConvQualified alice' qconvA1 - mems2 <- fmap cnvMembers . responseJsonError =<< getConvQualified alice' qconvA2 - mems3 <- fmap cnvMembers . responseJsonError =<< getConvQualified alice' qconvA3 - mems4 <- fmap cnvMembers . responseJsonError =<< getConvQualified alice' qconvA4 - let findOther u = find ((== u) . omQualifiedId) . cmOthers - liftIO $ do - findOther alexDel mems1 @?= Nothing - findOther alexDel mems2 @?= Nothing - findOther amy mems2 @?= Just (OtherMember amy Nothing roleNameWireAdmin) - findOther alexDel mems3 @?= Nothing - findOther amy mems3 @?= Just (OtherMember amy Nothing roleNameWireAdmin) - findOther alexDel mems4 @?= Nothing - where - createOtherMember :: Qualified UserId -> OtherMember - createOtherMember quid = - OtherMember - { omQualifiedId = quid, - omService = Nothing, - omConvRoleName = roleNameWireAdmin - } - testAllOne2OneConversationRequests :: TestM () testAllOne2OneConversationRequests = do for_ [LocalActor, RemoteActor] $ \actor -> diff --git a/services/galley/test/integration/API/Federation.hs b/services/galley/test/integration/API/Federation.hs index 070010d0867..e6bd8eea883 100644 --- a/services/galley/test/integration/API/Federation.hs +++ b/services/galley/test/integration/API/Federation.hs @@ -241,11 +241,11 @@ addLocalUser = do now <- liftIO getCurrentTime let cu = FedGalley.ConversationUpdate - { FedGalley.cuTime = now, - FedGalley.cuOrigUserId = qbob, - FedGalley.cuConvId = conv, - FedGalley.cuAlreadyPresentUsers = [charlie], - FedGalley.cuAction = + { FedGalley.time = now, + FedGalley.origUserId = qbob, + FedGalley.convId = conv, + FedGalley.alreadyPresentUsers = [charlie], + FedGalley.action = SomeConversationAction (sing @'ConversationJoinTag) (ConversationJoin (qalice :| [qdee]) roleNameWireMember) } WS.bracketRN c [alice, charlie, dee] $ \[wsA, wsC, wsD] -> do @@ -295,15 +295,15 @@ addUnconnectedUsersOnly = do -- Bob attempts to add unconnected Charlie (possible abuse) let cu = FedGalley.ConversationUpdate - { FedGalley.cuTime = now, - FedGalley.cuOrigUserId = qBob, - FedGalley.cuConvId = conv, - FedGalley.cuAlreadyPresentUsers = [alice], - FedGalley.cuAction = + { FedGalley.time = now, + FedGalley.origUserId = qBob, + FedGalley.convId = conv, + FedGalley.alreadyPresentUsers = [alice], + FedGalley.action = SomeConversationAction (sing @'ConversationJoinTag) (ConversationJoin (qCharlie :| []) roleNameWireMember) } -- Alice receives no notifications from this - void $ runFedClient @"on-conversation-updated" fedGalleyClient remoteDomain cu + void $ runFedClient @("on-conversation-updated") fedGalleyClient remoteDomain cu WS.assertNoEvent (5 # Second) [wsA] -- | This test invokes the federation endpoint: @@ -329,20 +329,20 @@ removeLocalUser = do now <- liftIO getCurrentTime let cuAdd = FedGalley.ConversationUpdate - { FedGalley.cuTime = now, - FedGalley.cuOrigUserId = qBob, - FedGalley.cuConvId = conv, - FedGalley.cuAlreadyPresentUsers = [], - FedGalley.cuAction = + { FedGalley.time = now, + FedGalley.origUserId = qBob, + FedGalley.convId = conv, + FedGalley.alreadyPresentUsers = [], + FedGalley.action = SomeConversationAction (sing @'ConversationJoinTag) (ConversationJoin (pure qAlice) roleNameWireMember) } cuRemove = FedGalley.ConversationUpdate - { FedGalley.cuTime = addUTCTime (secondsToNominalDiffTime 5) now, - FedGalley.cuOrigUserId = qAlice, - FedGalley.cuConvId = conv, - FedGalley.cuAlreadyPresentUsers = [alice], - FedGalley.cuAction = + { FedGalley.time = addUTCTime (secondsToNominalDiffTime 5) now, + FedGalley.origUserId = qAlice, + FedGalley.convId = conv, + FedGalley.alreadyPresentUsers = [alice], + FedGalley.action = SomeConversationAction (sing @'ConversationLeaveTag) () } @@ -402,11 +402,11 @@ removeRemoteUser = do let cuRemove user = FedGalley.ConversationUpdate - { FedGalley.cuTime = addUTCTime (secondsToNominalDiffTime 5) now, - FedGalley.cuOrigUserId = qBob, - FedGalley.cuConvId = conv, - FedGalley.cuAlreadyPresentUsers = [alice, charlie, dee], - FedGalley.cuAction = + { FedGalley.time = addUTCTime (secondsToNominalDiffTime 5) now, + FedGalley.origUserId = qBob, + FedGalley.convId = conv, + FedGalley.alreadyPresentUsers = [alice, charlie, dee], + FedGalley.action = SomeConversationAction (sing @'ConversationRemoveMembersTag) (ConversationRemoveMembers (pure user) EdReasonRemoved) @@ -457,11 +457,11 @@ notifyUpdate extras action etype edata = do now <- liftIO getCurrentTime let cu = FedGalley.ConversationUpdate - { FedGalley.cuTime = now, - FedGalley.cuOrigUserId = qbob, - FedGalley.cuConvId = conv, - FedGalley.cuAlreadyPresentUsers = [alice, charlie], - FedGalley.cuAction = action + { FedGalley.time = now, + FedGalley.origUserId = qbob, + FedGalley.convId = conv, + FedGalley.alreadyPresentUsers = [alice, charlie], + FedGalley.action = action } WS.bracketR2 c alice charlie $ \(wsA, wsC) -> do void $ runFedClient @"on-conversation-updated" fedGalleyClient bdom cu @@ -499,11 +499,11 @@ notifyUpdateUnavailable extras action etype edata = do now <- liftIO getCurrentTime let cu = FedGalley.ConversationUpdate - { FedGalley.cuTime = now, - FedGalley.cuOrigUserId = qbob, - FedGalley.cuConvId = conv, - FedGalley.cuAlreadyPresentUsers = [alice, charlie], - FedGalley.cuAction = action + { FedGalley.time = now, + FedGalley.origUserId = qbob, + FedGalley.convId = conv, + FedGalley.alreadyPresentUsers = [alice, charlie], + FedGalley.action = action } WS.bracketR2 c alice charlie $ \(wsA, wsC) -> do ((), _fedRequests) <- @@ -635,11 +635,11 @@ notifyDeletedConversation = do now <- liftIO getCurrentTime let cu = FedGalley.ConversationUpdate - { FedGalley.cuTime = now, - FedGalley.cuOrigUserId = qbob, - FedGalley.cuConvId = qUnqualified qconv, - FedGalley.cuAlreadyPresentUsers = [alice], - FedGalley.cuAction = SomeConversationAction (sing @'ConversationDeleteTag) () + { FedGalley.time = now, + FedGalley.origUserId = qbob, + FedGalley.convId = qUnqualified qconv, + FedGalley.alreadyPresentUsers = [alice], + FedGalley.action = SomeConversationAction (sing @'ConversationDeleteTag) () } void $ runFedClient @"on-conversation-updated" fedGalleyClient bobDomain cu @@ -691,11 +691,11 @@ addRemoteUser = do -- The conversation owning let cu = FedGalley.ConversationUpdate - { FedGalley.cuTime = now, - FedGalley.cuOrigUserId = qbob, - FedGalley.cuConvId = qUnqualified qconv, - FedGalley.cuAlreadyPresentUsers = map qUnqualified [qalice, qcharlie], - FedGalley.cuAction = + { FedGalley.time = now, + FedGalley.origUserId = qbob, + FedGalley.convId = qUnqualified qconv, + FedGalley.alreadyPresentUsers = map qUnqualified [qalice, qcharlie], + FedGalley.action = SomeConversationAction (sing @'ConversationJoinTag) (ConversationJoin (qdee :| [qeve, qflo]) roleNameWireMember) } WS.bracketRN c (map qUnqualified [qalice, qcharlie, qdee, qflo]) $ \[wsA, wsC, wsD, wsF] -> do @@ -774,11 +774,11 @@ onMessageSent = do connectWithRemoteUser alice qbob let cu = FedGalley.ConversationUpdate - { FedGalley.cuTime = now, - FedGalley.cuOrigUserId = qbob, - FedGalley.cuConvId = conv, - FedGalley.cuAlreadyPresentUsers = [], - FedGalley.cuAction = + { FedGalley.time = now, + FedGalley.origUserId = qbob, + FedGalley.convId = conv, + FedGalley.alreadyPresentUsers = [], + FedGalley.action = SomeConversationAction (sing @'ConversationJoinTag) (ConversationJoin (pure qalice) roleNameWireMember) } void $ runFedClient @"on-conversation-updated" fedGalleyClient bdom cu diff --git a/services/galley/test/integration/API/MLS.hs b/services/galley/test/integration/API/MLS.hs index 3429d03a1bc..d998d891fc7 100644 --- a/services/galley/test/integration/API/MLS.hs +++ b/services/galley/test/integration/API/MLS.hs @@ -880,11 +880,11 @@ testRemoteToRemoteInSub = do connectWithRemoteUser alice qbob let cu = ConversationUpdate - { cuTime = now, - cuOrigUserId = qbob, - cuConvId = conv, - cuAlreadyPresentUsers = [], - cuAction = + { time = now, + origUserId = qbob, + convId = conv, + alreadyPresentUsers = [], + action = SomeConversationAction (sing @'ConversationJoinTag) (ConversationJoin (pure qalice) roleNameWireMember) } void $ runFedClient @"on-conversation-updated" fedGalleyClient bdom cu diff --git a/services/galley/test/integration/API/MLS/Util.hs b/services/galley/test/integration/API/MLS/Util.hs index 8cc9b51c601..d979556e596 100644 --- a/services/galley/test/integration/API/MLS/Util.hs +++ b/services/galley/test/integration/API/MLS/Util.hs @@ -965,11 +965,11 @@ receiveOnConvUpdated conv origUser joiner = do now <- liftIO getCurrentTime let cu = ConversationUpdate - { cuTime = now, - cuOrigUserId = origUser, - cuConvId = qUnqualified conv, - cuAlreadyPresentUsers = [qUnqualified joiner], - cuAction = + { time = now, + origUserId = origUser, + convId = qUnqualified conv, + alreadyPresentUsers = [qUnqualified joiner], + action = SomeConversationAction SConversationJoinTag ConversationJoin diff --git a/services/galley/test/integration/API/Util.hs b/services/galley/test/integration/API/Util.hs index b6227da8967..4de8b00e8df 100644 --- a/services/galley/test/integration/API/Util.hs +++ b/services/galley/test/integration/API/Util.hs @@ -2676,11 +2676,13 @@ withTempMockFederator' :: m b -> m (b, [FederatedRequest]) withTempMockFederator' resp action = do - let mock = runMock (assertFailure . Text.unpack) $ do - r <- resp - pure ("application" // "json", r) + let mock = + def + { handler = runMock (assertFailure . Text.unpack) $ do + r <- resp + pure ("application" // "json", r) + } Mock.withTempMockFederator - [("Content-Type", "application/json")] mock $ \mockPort -> do withSettingsOverrides (\opts -> opts & Opts.federator ?~ Endpoint "127.0.0.1" (fromIntegral mockPort)) action diff --git a/services/galley/test/integration/TestSetup.hs b/services/galley/test/integration/TestSetup.hs index 2cdb594af24..d4d8c7151b0 100644 --- a/services/galley/test/integration/TestSetup.hs +++ b/services/galley/test/integration/TestSetup.hs @@ -55,7 +55,6 @@ import Data.ByteString.Conversion import Data.Domain import Data.Proxy import Data.Text qualified as Text -import GHC.TypeLits import Galley.Aws qualified as Aws import Galley.Options (Opts) import Imports @@ -141,7 +140,7 @@ instance VersionedMonad v ClientM where guardVersion _ = pure () runFedClient :: - forall (name :: Symbol) comp m api. + forall name comp m api. ( HasUnsafeFedEndpoint comp api name, Servant.HasClient Servant.ClientM api, MonadIO m,