diff --git a/changelog.d/3-bug-fixes/WPB-6258 b/changelog.d/3-bug-fixes/WPB-6258 new file mode 100644 index 00000000000..2513b3c396e --- /dev/null +++ b/changelog.d/3-bug-fixes/WPB-6258 @@ -0,0 +1 @@ +Send connection cancelled event to local pending connection when user gets deleted diff --git a/integration/test/Notifications.hs b/integration/test/Notifications.hs index 9ea53706223..0fdd7df49a2 100644 --- a/integration/test/Notifications.hs +++ b/integration/test/Notifications.hs @@ -113,6 +113,12 @@ isConvDeleteNotif n = fieldEquals n "payload.0.type" "conversation.delete" isTeamMemberLeaveNotif :: MakesValue a => a -> App Bool isTeamMemberLeaveNotif n = nPayload n %. "type" `isEqual` "team.member-leave" +isConnectionNotif :: MakesValue a => String -> a -> App Bool +isConnectionNotif status n = + (&&) + <$> nPayload n %. "type" `isEqual` "user.connection" + <*> nPayload n %. "connection.status" `isEqual` status + assertLeaveNotification :: ( HasCallStack, MakesValue fromUser, diff --git a/integration/test/Test/Connection.hs b/integration/test/Test/Connection.hs index 0852552c1d4..f982df677d4 100644 --- a/integration/test/Test/Connection.hs +++ b/integration/test/Test/Connection.hs @@ -19,6 +19,7 @@ module Test.Connection where import API.Brig (getConnection, postConnection, putConnection) import API.BrigInternal import API.Galley +import Notifications import SetupHelpers import Testlib.Prelude import UnliftIO.Async (forConcurrently_) @@ -401,3 +402,14 @@ testFederationAllowMixedConnectWithRemote = connectTwoUsers alice bob where defSearchPolicy = "full_search" + +testPendingConnectionUserDeleted :: HasCallStack => Domain -> App () +testPendingConnectionUserDeleted bobsDomain = do + alice <- randomUser OwnDomain def + bob <- randomUser bobsDomain def + + withWebSockets [bob] $ \[bobWs] -> do + void $ postConnection alice bob >>= getBody 201 + void $ awaitMatch (isConnectionNotif "pending") bobWs + void $ deleteUser alice + void $ awaitMatch (isConnectionNotif "cancelled") bobWs diff --git a/libs/polysemy-wire-zoo/src/Wire/Sem/Paging/Cassandra.hs b/libs/polysemy-wire-zoo/src/Wire/Sem/Paging/Cassandra.hs index 7507d69c7d0..3856abcd9d0 100644 --- a/libs/polysemy-wire-zoo/src/Wire/Sem/Paging/Cassandra.hs +++ b/libs/polysemy-wire-zoo/src/Wire/Sem/Paging/Cassandra.hs @@ -36,6 +36,7 @@ import Data.Id import Data.Qualified import Data.Range import Imports +import Wire.API.Connection (UserConnection) import Wire.API.Team.Member (HardTruncationLimit, TeamMember) import qualified Wire.Sem.Paging as E @@ -97,6 +98,8 @@ type instance E.PagingBounds CassandraPaging TeamMember = Range 1 HardTruncation type instance E.PagingBounds InternalPaging TeamId = Range 1 100 Int32 +type instance E.PagingBounds InternalPaging (Remote UserConnection) = Range 1 1000 Int32 + instance E.Paging InternalPaging where pageItems (InternalPage (_, _, items)) = items pageHasMore (InternalPage (p, _, _)) = hasMore p diff --git a/libs/types-common/src/Data/Range.hs b/libs/types-common/src/Data/Range.hs index 0ad0a3e2c14..d7a92f08d11 100644 --- a/libs/types-common/src/Data/Range.hs +++ b/libs/types-common/src/Data/Range.hs @@ -98,7 +98,7 @@ import Test.QuickCheck qualified as QC newtype Range (n :: Nat) (m :: Nat) a = Range { fromRange :: a } - deriving (Eq, Ord, Show) + deriving (Eq, Ord, Show, Functor) toRange :: (n <= x, x <= m, KnownNat x, Num a) => Proxy x -> Range n m a toRange = Range . fromIntegral . natVal diff --git a/libs/wire-api/src/Wire/API/Routes/Internal/Brig.hs b/libs/wire-api/src/Wire/API/Routes/Internal/Brig.hs index 4a52bf64aa7..7e9e76ff581 100644 --- a/libs/wire-api/src/Wire/API/Routes/Internal/Brig.hs +++ b/libs/wire-api/src/Wire/API/Routes/Internal/Brig.hs @@ -173,6 +173,7 @@ type AccountAPI = "createUserNoVerify" ( "users" :> MakesFederatedCall 'Brig "on-user-deleted-connections" + :> MakesFederatedCall 'Brig "send-connection-action" :> ReqBody '[Servant.JSON] NewUser :> MultiVerb 'POST '[Servant.JSON] RegisterInternalResponses (Either RegisterError SelfProfile) ) @@ -181,6 +182,7 @@ type AccountAPI = ( "users" :> "spar" :> MakesFederatedCall 'Brig "on-user-deleted-connections" + :> MakesFederatedCall 'Brig "send-connection-action" :> ReqBody '[Servant.JSON] NewUserSpar :> MultiVerb 'POST '[Servant.JSON] CreateUserSparInternalResponses (Either CreateUserSparError SelfProfile) ) @@ -679,6 +681,7 @@ type AuthAPI = "legalhold-login" ( "legalhold-login" :> MakesFederatedCall 'Brig "on-user-deleted-connections" + :> MakesFederatedCall 'Brig "send-connection-action" :> ReqBody '[JSON] LegalHoldLogin :> MultiVerb1 'POST '[JSON] TokenResponse ) @@ -686,6 +689,7 @@ type AuthAPI = "sso-login" ( "sso-login" :> MakesFederatedCall 'Brig "on-user-deleted-connections" + :> MakesFederatedCall 'Brig "send-connection-action" :> ReqBody '[JSON] SsoLogin :> QueryParam' [Optional, Strict] "persist" Bool :> MultiVerb1 'POST '[JSON] TokenResponse diff --git a/libs/wire-api/src/Wire/API/Routes/Public/Brig.hs b/libs/wire-api/src/Wire/API/Routes/Public/Brig.hs index ada615249cb..15b07451e10 100644 --- a/libs/wire-api/src/Wire/API/Routes/Public/Brig.hs +++ b/libs/wire-api/src/Wire/API/Routes/Public/Brig.hs @@ -316,6 +316,7 @@ type SelfAPI = \password, it must be provided. if password is correct, or if neither \ \a verified identity nor a password exists, account deletion \ \is scheduled immediately." + :> MakesFederatedCall 'Brig "send-connection-action" :> CanThrow 'InvalidUser :> CanThrow 'InvalidCode :> CanThrow 'BadCredentials @@ -333,6 +334,7 @@ type SelfAPI = Named "put-self" ( Summary "Update your profile." + :> MakesFederatedCall 'Brig "send-connection-action" :> ZUser :> ZConn :> "self" @@ -358,6 +360,7 @@ type SelfAPI = :> Description "Your phone number can only be removed if you also have an \ \email address and a password." + :> MakesFederatedCall 'Brig "send-connection-action" :> ZUser :> ZConn :> "self" @@ -373,6 +376,7 @@ type SelfAPI = :> Description "Your email address can only be removed if you also have a \ \phone number." + :> MakesFederatedCall 'Brig "send-connection-action" :> ZUser :> ZConn :> "self" @@ -405,6 +409,7 @@ type SelfAPI = :<|> Named "change-locale" ( Summary "Change your locale." + :> MakesFederatedCall 'Brig "send-connection-action" :> ZUser :> ZConn :> "self" @@ -415,6 +420,8 @@ type SelfAPI = :<|> Named "change-handle" ( Summary "Change your handle." + :> MakesFederatedCall 'Brig "send-connection-action" + :> MakesFederatedCall 'Brig "send-connection-action" :> ZUser :> ZConn :> "self" @@ -477,6 +484,7 @@ type AccountAPI = "If the environment where the registration takes \ \place is private and a registered email address or phone \ \number is not whitelisted, a 403 error is returned." + :> MakesFederatedCall 'Brig "send-connection-action" :> "register" :> ReqBody '[JSON] NewUserPublic :> MultiVerb 'POST '[JSON] RegisterResponses (Either RegisterError RegisterSuccess) @@ -487,6 +495,7 @@ type AccountAPI = :<|> Named "verify-delete" ( Summary "Verify account deletion with a code." + :> MakesFederatedCall 'Brig "send-connection-action" :> CanThrow 'InvalidCode :> "delete" :> ReqBody '[JSON] VerifyDeleteUser @@ -498,6 +507,7 @@ type AccountAPI = :<|> Named "get-activate" ( Summary "Activate (i.e. confirm) an email address or phone number." + :> MakesFederatedCall 'Brig "send-connection-action" :> Description "See also 'POST /activate' which has a larger feature set." :> CanThrow 'UserKeyExists :> CanThrow 'InvalidActivationCodeWrongUser @@ -524,6 +534,7 @@ type AccountAPI = :> Description "Activation only succeeds once and the number of \ \failed attempts for a valid key is limited." + :> MakesFederatedCall 'Brig "send-connection-action" :> CanThrow 'UserKeyExists :> CanThrow 'InvalidActivationCodeWrongUser :> CanThrow 'InvalidActivationCodeWrongCode @@ -728,6 +739,7 @@ type UserClientAPI = Named "add-client" ( Summary "Register a new client" + :> MakesFederatedCall 'Brig "send-connection-action" :> CanThrow 'TooManyClients :> CanThrow 'MissingAuth :> CanThrow 'MalformedPrekeys @@ -1334,6 +1346,7 @@ type AuthAPI = \ Every other combination is invalid.\ \ Access tokens can be given as query parameter or authorisation\ \ header, with the latter being preferred." + :> MakesFederatedCall 'Brig "send-connection-action" :> QueryParam "client_id" ClientId :> Cookies '["zuid" ::: SomeUserToken] :> Bearer SomeAccessToken @@ -1364,6 +1377,7 @@ type AuthAPI = ( "login" :> Summary "Authenticate a user to obtain a cookie and first access token" :> Description "Logins are throttled at the server's discretion" + :> MakesFederatedCall 'Brig "send-connection-action" :> ReqBody '[JSON] Login :> QueryParam' [ Optional, diff --git a/services/brig/brig.cabal b/services/brig/brig.cabal index 5c86b737589..6158288d854 100644 --- a/services/brig/brig.cabal +++ b/services/brig/brig.cabal @@ -125,6 +125,8 @@ library Brig.Effects.BlacklistStore.Cassandra Brig.Effects.CodeStore Brig.Effects.CodeStore.Cassandra + Brig.Effects.ConnectionStore + Brig.Effects.ConnectionStore.Cassandra Brig.Effects.FederationConfigStore Brig.Effects.FederationConfigStore.Cassandra Brig.Effects.GalleyProvider diff --git a/services/brig/src/Brig/API/Auth.hs b/services/brig/src/Brig/API/Auth.hs index 6b0d93aa56e..889a12d9b40 100644 --- a/services/brig/src/Brig/API/Auth.hs +++ b/services/brig/src/Brig/API/Auth.hs @@ -24,6 +24,7 @@ import Brig.API.User import Brig.App import Brig.Data.User qualified as User import Brig.Effects.BlacklistStore +import Brig.Effects.ConnectionStore (ConnectionStore) import Brig.Effects.GalleyProvider import Brig.Options import Brig.User.Auth qualified as Auth @@ -37,12 +38,14 @@ import Data.List1 (List1 (..)) import Data.Qualified import Data.Text qualified as T import Data.Text.Lazy qualified as LT +import Data.Time.Clock (UTCTime) import Data.ZAuth.Token qualified as ZAuth import Imports import Network.HTTP.Types import Network.Wai.Utilities ((!>>)) import Network.Wai.Utilities.Error qualified as Wai import Polysemy +import Polysemy.Input (Input) import Polysemy.TinyLog (TinyLog) import Wire.API.User import Wire.API.User.Auth hiding (access) @@ -50,11 +53,15 @@ import Wire.API.User.Auth.LegalHold import Wire.API.User.Auth.ReAuth import Wire.API.User.Auth.Sso import Wire.NotificationSubsystem +import Wire.Sem.Paging.Cassandra (InternalPaging) accessH :: ( Member TinyLog r, Member (Embed HttpClientIO) r, - Member NotificationSubsystem r + Member NotificationSubsystem r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => Maybe ClientId -> [Either Text SomeUserToken] -> @@ -70,7 +77,10 @@ access :: ( TokenPair u a, Member TinyLog r, Member (Embed HttpClientIO) r, - Member NotificationSubsystem r + Member NotificationSubsystem r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => Maybe ClientId -> NonEmpty (Token u) -> @@ -90,7 +100,10 @@ login :: ( Member GalleyProvider r, Member TinyLog r, Member (Embed HttpClientIO) r, - Member NotificationSubsystem r + Member NotificationSubsystem r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => Login -> Maybe Bool -> @@ -150,7 +163,10 @@ legalHoldLogin :: ( Member GalleyProvider r, Member (Embed HttpClientIO) r, Member NotificationSubsystem r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => LegalHoldLogin -> Handler r SomeAccess @@ -162,7 +178,10 @@ legalHoldLogin lhl = do ssoLogin :: ( Member TinyLog r, Member (Embed HttpClientIO) r, - Member NotificationSubsystem r + Member NotificationSubsystem r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => SsoLogin -> Maybe Bool -> diff --git a/services/brig/src/Brig/API/Client.hs b/services/brig/src/Brig/API/Client.hs index 529b81ad0e9..948bd3f2a6c 100644 --- a/services/brig/src/Brig/API/Client.hs +++ b/services/brig/src/Brig/API/Client.hs @@ -53,6 +53,7 @@ import Brig.App import Brig.Data.Client qualified as Data import Brig.Data.Nonce as Nonce import Brig.Data.User qualified as Data +import Brig.Effects.ConnectionStore (ConnectionStore) import Brig.Effects.GalleyProvider (GalleyProvider) import Brig.Effects.GalleyProvider qualified as GalleyProvider import Brig.Effects.JwtTools (JwtTools) @@ -86,10 +87,12 @@ import Data.Map.Strict qualified as Map import Data.Misc (PlainTextPassword6) import Data.Qualified import Data.Set qualified as Set +import Data.Time.Clock (UTCTime) import Imports import Network.HTTP.Types.Method (StdMethod) import Network.Wai.Utilities import Polysemy +import Polysemy.Input (Input) import Polysemy.TinyLog import Servant (Link, ToHttpApiData (toUrlPiece)) import System.Logger.Class (field, msg, val, (~~)) @@ -110,6 +113,7 @@ import Wire.NotificationSubsystem import Wire.Sem.Concurrency import Wire.Sem.FromUTC (FromUTC (fromUTCTime)) import Wire.Sem.Now as Now +import Wire.Sem.Paging.Cassandra (InternalPaging) lookupLocalClient :: UserId -> ClientId -> (AppT r) (Maybe Client) lookupLocalClient uid = wrapClient . Data.lookupClient uid @@ -158,7 +162,10 @@ addClient :: ( Member GalleyProvider r, Member (Embed HttpClientIO) r, Member NotificationSubsystem r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => UserId -> Maybe ConnId -> @@ -173,7 +180,10 @@ addClientWithReAuthPolicy :: ( Member GalleyProvider r, Member (Embed HttpClientIO) r, Member NotificationSubsystem r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => Data.ReAuthPolicy -> UserId -> @@ -475,7 +485,10 @@ pubClient c = legalHoldClientRequested :: ( Member (Embed HttpClientIO) r, Member NotificationSubsystem r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => UserId -> LegalHoldClientRequest -> @@ -493,7 +506,10 @@ legalHoldClientRequested targetUser (LegalHoldClientRequest _requester lastPreke removeLegalHoldClient :: ( Member (Embed HttpClientIO) r, Member NotificationSubsystem r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => UserId -> AppT r () diff --git a/services/brig/src/Brig/API/Internal.hs b/services/brig/src/Brig/API/Internal.hs index f811894c335..659db42be15 100644 --- a/services/brig/src/Brig/API/Internal.hs +++ b/services/brig/src/Brig/API/Internal.hs @@ -42,6 +42,7 @@ import Brig.Data.User qualified as Data import Brig.Effects.BlacklistPhonePrefixStore (BlacklistPhonePrefixStore) import Brig.Effects.BlacklistStore (BlacklistStore) import Brig.Effects.CodeStore (CodeStore) +import Brig.Effects.ConnectionStore (ConnectionStore) import Brig.Effects.FederationConfigStore (AddFederationRemoteResult (..), AddFederationRemoteTeamResult (..), FederationConfigStore, UpdateFederationResult (..)) import Brig.Effects.FederationConfigStore qualified as E import Brig.Effects.GalleyProvider (GalleyProvider) @@ -71,11 +72,13 @@ import Data.Id as Id import Data.Map.Strict qualified as Map import Data.Qualified import Data.Set qualified as Set +import Data.Time.Clock (UTCTime) import Data.Time.Clock.System import Imports hiding (head) import Network.Wai.Routing hiding (toList) import Network.Wai.Utilities as Utilities import Polysemy +import Polysemy.Input (Input) import Polysemy.TinyLog (TinyLog) import Servant hiding (Handler, JSON, addHeader, respond) import Servant.OpenApi.Internal.Orphans () @@ -98,6 +101,7 @@ import Wire.API.User.Client import Wire.API.User.RichInfo import Wire.NotificationSubsystem import Wire.Sem.Concurrency +import Wire.Sem.Paging.Cassandra (InternalPaging) --------------------------------------------------------------------------- -- Sitemap (servant) @@ -114,7 +118,10 @@ servantSitemap :: Member (Embed HttpClientIO) r, Member NotificationSubsystem r, Member TinyLog r, - Member (Concurrency 'Unsafe) r + Member (Concurrency 'Unsafe) r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => ServerT BrigIRoutes.API (Handler r) servantSitemap = @@ -157,7 +164,10 @@ accountAPI :: Member (UserPendingActivationStore p) r, Member (Embed HttpClientIO) r, Member NotificationSubsystem r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => ServerT BrigIRoutes.AccountAPI (Handler r) accountAPI = @@ -201,7 +211,10 @@ teamsAPI :: Member (Embed HttpClientIO) r, Member NotificationSubsystem r, Member (Concurrency 'Unsafe) r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => ServerT BrigIRoutes.TeamsAPI (Handler r) teamsAPI = @@ -226,7 +239,10 @@ authAPI :: ( Member GalleyProvider r, Member TinyLog r, Member (Embed HttpClientIO) r, - Member NotificationSubsystem r + Member NotificationSubsystem r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => ServerT BrigIRoutes.AuthAPI (Handler r) authAPI = @@ -370,7 +386,10 @@ addClientInternalH :: ( Member GalleyProvider r, Member (Embed HttpClientIO) r, Member NotificationSubsystem r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => UserId -> Maybe Bool -> @@ -386,7 +405,10 @@ addClientInternalH usr mSkipReAuth new connId = do legalHoldClientRequestedH :: ( Member (Embed HttpClientIO) r, Member NotificationSubsystem r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => UserId -> LegalHoldClientRequest -> @@ -397,7 +419,10 @@ legalHoldClientRequestedH targetUser clientRequest = do removeLegalHoldClientH :: ( Member (Embed HttpClientIO) r, Member NotificationSubsystem r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => UserId -> (Handler r) NoContent @@ -419,7 +444,10 @@ createUserNoVerify :: Member (UserPendingActivationStore p) r, Member TinyLog r, Member (Embed HttpClientIO) r, - Member NotificationSubsystem r + Member NotificationSubsystem r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => NewUser -> (Handler r) (Either RegisterError SelfProfile) @@ -440,7 +468,10 @@ createUserNoVerifySpar :: ( Member GalleyProvider r, Member (Embed HttpClientIO) r, Member NotificationSubsystem r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => NewUserSpar -> (Handler r) (Either CreateUserSparError SelfProfile) @@ -461,7 +492,10 @@ createUserNoVerifySpar uData = deleteUserNoAuthH :: ( Member (Embed HttpClientIO) r, Member NotificationSubsystem r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => UserId -> (Handler r) DeleteUserResponse @@ -577,7 +611,10 @@ getPasswordResetCode emailOrPhone = changeAccountStatusH :: ( Member (Embed HttpClientIO) r, Member NotificationSubsystem r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => UserId -> AccountStatusUpdate -> @@ -622,7 +659,10 @@ getConnectionsStatus (ConnectionsStatusRequestV2 froms mtos mrel) = do revokeIdentityH :: ( Member (Embed HttpClientIO) r, Member NotificationSubsystem r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => Maybe Email -> Maybe Phone -> @@ -685,7 +725,10 @@ addPhonePrefixH prefix = lift $ NoContent <$ API.phonePrefixInsert prefix updateSSOIdH :: ( Member (Embed HttpClientIO) r, Member NotificationSubsystem r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => UserId -> UserSSOId -> @@ -701,7 +744,10 @@ updateSSOIdH uid ssoid = do deleteSSOIdH :: ( Member (Embed HttpClientIO) r, Member NotificationSubsystem r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => UserId -> (Handler r) UpdateSSOIdResponse @@ -766,7 +812,10 @@ updateHandleH :: ( Member (Embed HttpClientIO) r, Member NotificationSubsystem r, Member GalleyProvider r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => UserId -> HandleUpdate -> @@ -780,7 +829,10 @@ updateUserNameH :: ( Member (Embed HttpClientIO) r, Member NotificationSubsystem r, Member GalleyProvider r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => UserId -> NameUpdate -> diff --git a/services/brig/src/Brig/API/Public.hs b/services/brig/src/Brig/API/Public.hs index 8beab24c7d3..a2ab94c1c61 100644 --- a/services/brig/src/Brig/API/Public.hs +++ b/services/brig/src/Brig/API/Public.hs @@ -48,6 +48,7 @@ import Brig.Data.UserKey qualified as UserKey import Brig.Effects.BlacklistPhonePrefixStore (BlacklistPhonePrefixStore) import Brig.Effects.BlacklistStore (BlacklistStore) import Brig.Effects.CodeStore (CodeStore) +import Brig.Effects.ConnectionStore (ConnectionStore) import Brig.Effects.FederationConfigStore (FederationConfigStore) import Brig.Effects.GalleyProvider (GalleyProvider) import Brig.Effects.GalleyProvider qualified as GalleyProvider @@ -94,6 +95,7 @@ import Data.Schema () import Data.Text qualified as Text import Data.Text.Ascii qualified as Ascii import Data.Text.Lazy (pack) +import Data.Time.Clock (UTCTime) import Data.ZAuth.Token qualified as ZAuth import FileEmbedLzma import Galley.Types.Teams (HiddenPerm (..), hasPermission) @@ -101,6 +103,7 @@ import Imports hiding (head) import Network.Socket (PortNumber) import Network.Wai.Utilities as Utilities import Polysemy +import Polysemy.Input (Input) import Polysemy.TinyLog (TinyLog) import Servant hiding (Handler, JSON, addHeader, respond) import Servant qualified @@ -155,6 +158,7 @@ import Wire.NotificationSubsystem import Wire.Sem.Concurrency import Wire.Sem.Jwk (Jwk) import Wire.Sem.Now (Now) +import Wire.Sem.Paging.Cassandra (InternalPaging) -- User API ----------------------------------------------------------- @@ -276,7 +280,10 @@ servantSitemap :: Member FederationConfigStore r, Member (Embed HttpClientIO) r, Member NotificationSubsystem r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => ServerT BrigAPI (Handler r) servantSitemap = @@ -563,7 +570,10 @@ addClient :: ( Member GalleyProvider r, Member (Embed HttpClientIO) r, Member NotificationSubsystem r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => UserId -> ConnId -> @@ -686,7 +696,10 @@ createUser :: Member (UserPendingActivationStore p) r, Member TinyLog r, Member (Embed HttpClientIO) r, - Member NotificationSubsystem r + Member NotificationSubsystem r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => Public.NewUserPublic -> (Handler r) (Either Public.RegisterError Public.RegisterSuccess) @@ -878,7 +891,10 @@ updateUser :: ( Member (Embed HttpClientIO) r, Member NotificationSubsystem r, Member GalleyProvider r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => UserId -> ConnId -> @@ -905,7 +921,10 @@ changePhone u _ (Public.puPhone -> phone) = lift . exceptTToMaybe $ do removePhone :: ( Member (Embed HttpClientIO) r, Member NotificationSubsystem r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => UserId -> ConnId -> @@ -916,7 +935,10 @@ removePhone self conn = removeEmail :: ( Member (Embed HttpClientIO) r, Member NotificationSubsystem r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => UserId -> ConnId -> @@ -933,7 +955,10 @@ changePassword u cp = lift . exceptTToMaybe $ API.changePassword u cp changeLocale :: ( Member (Embed HttpClientIO) r, Member NotificationSubsystem r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => UserId -> ConnId -> @@ -944,7 +969,10 @@ changeLocale u conn l = lift $ API.changeLocale u conn l changeSupportedProtocols :: ( Member (Embed HttpClientIO) r, Member NotificationSubsystem r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => Local UserId -> ConnId -> @@ -988,7 +1016,10 @@ changeHandle :: ( Member (Embed HttpClientIO) r, Member NotificationSubsystem r, Member GalleyProvider r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => UserId -> ConnId -> @@ -1174,7 +1205,10 @@ deleteSelfUser :: ( Member GalleyProvider r, Member TinyLog r, Member (Embed HttpClientIO) r, - Member NotificationSubsystem r + Member NotificationSubsystem r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => UserId -> Public.DeleteUser -> @@ -1185,7 +1219,10 @@ deleteSelfUser u body = do verifyDeleteUser :: ( Member (Embed HttpClientIO) r, Member NotificationSubsystem r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => Public.VerifyDeleteUser -> Handler r () @@ -1226,7 +1263,10 @@ activate :: ( Member GalleyProvider r, Member TinyLog r, Member (Embed HttpClientIO) r, - Member NotificationSubsystem r + Member NotificationSubsystem r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => Public.ActivationKey -> Public.ActivationCode -> @@ -1240,7 +1280,10 @@ activateKey :: ( Member GalleyProvider r, Member TinyLog r, Member (Embed HttpClientIO) r, - Member NotificationSubsystem r + Member NotificationSubsystem r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => Public.Activate -> (Handler r) ActivationRespWithStatus diff --git a/services/brig/src/Brig/API/User.hs b/services/brig/src/Brig/API/User.hs index bd5c84d555c..d3d7e096ef2 100644 --- a/services/brig/src/Brig/API/User.hs +++ b/services/brig/src/Brig/API/User.hs @@ -115,6 +115,7 @@ import Brig.Effects.BlacklistStore (BlacklistStore) import Brig.Effects.BlacklistStore qualified as BlacklistStore import Brig.Effects.CodeStore (CodeStore) import Brig.Effects.CodeStore qualified as E +import Brig.Effects.ConnectionStore (ConnectionStore) import Brig.Effects.GalleyProvider import Brig.Effects.GalleyProvider qualified as GalleyProvider import Brig.Effects.PasswordResetStore (PasswordResetStore) @@ -157,12 +158,13 @@ import Data.Map.Strict qualified as Map import Data.Metrics qualified as Metrics import Data.Misc import Data.Qualified -import Data.Time.Clock (addUTCTime, diffUTCTime) +import Data.Time.Clock (UTCTime, addUTCTime, diffUTCTime) import Data.UUID.V4 (nextRandom) import Galley.Types.Teams qualified as Team import Imports hiding (cs) import Network.Wai.Utilities import Polysemy +import Polysemy.Input (Input) import Polysemy.TinyLog (TinyLog) import Polysemy.TinyLog qualified as Log import System.Logger.Class (MonadLogger) @@ -189,6 +191,7 @@ import Wire.API.User.Password import Wire.API.User.RichInfo import Wire.NotificationSubsystem import Wire.Sem.Concurrency +import Wire.Sem.Paging.Cassandra (InternalPaging) data AllowSCIMUpdates = AllowSCIMUpdates @@ -232,7 +235,10 @@ createUserSpar :: ( Member GalleyProvider r, Member (Embed HttpClientIO) r, Member NotificationSubsystem r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => NewUserSpar -> ExceptT CreateUserSparError (AppT r) CreateUserResult @@ -302,7 +308,10 @@ createUser :: Member (UserPendingActivationStore p) r, Member TinyLog r, Member (Embed HttpClientIO) r, - Member NotificationSubsystem r + Member NotificationSubsystem r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => NewUser -> ExceptT RegisterError (AppT r) CreateUserResult @@ -592,7 +601,10 @@ updateUser :: ( Member (Embed HttpClientIO) r, Member NotificationSubsystem r, Member GalleyProvider r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => UserId -> Maybe ConnId -> @@ -623,7 +635,10 @@ updateUser uid mconn uu allowScim = do changeLocale :: ( Member (Embed HttpClientIO) r, Member NotificationSubsystem r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => UserId -> ConnId -> @@ -639,7 +654,10 @@ changeLocale uid conn (LocaleUpdate loc) = do changeManagedBy :: ( Member (Embed HttpClientIO) r, Member NotificationSubsystem r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => UserId -> ConnId -> @@ -655,7 +673,10 @@ changeManagedBy uid conn (ManagedByUpdate mb) = do changeSupportedProtocols :: ( Member (Embed HttpClientIO) r, Member NotificationSubsystem r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => UserId -> ConnId -> @@ -672,7 +693,10 @@ changeHandle :: ( Member (Embed HttpClientIO) r, Member NotificationSubsystem r, Member GalleyProvider r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => UserId -> Maybe ConnId -> @@ -840,7 +864,10 @@ changePhone u phone = do removeEmail :: ( Member (Embed HttpClientIO) r, Member NotificationSubsystem r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => UserId -> ConnId -> @@ -861,7 +888,10 @@ removeEmail uid conn = do removePhone :: ( Member (Embed HttpClientIO) r, Member NotificationSubsystem r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => UserId -> ConnId -> @@ -887,7 +917,10 @@ revokeIdentity :: forall r. ( Member (Embed HttpClientIO) r, Member NotificationSubsystem r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => Either Email Phone -> AppT r () @@ -930,7 +963,10 @@ changeAccountStatus :: ( Member (Embed HttpClientIO) r, Member NotificationSubsystem r, Member (Concurrency 'Unsafe) r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => List1 UserId -> AccountStatus -> @@ -950,7 +986,10 @@ changeAccountStatus usrs status = do changeSingleAccountStatus :: ( Member (Embed HttpClientIO) r, Member NotificationSubsystem r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => UserId -> AccountStatus -> @@ -980,7 +1019,10 @@ activate :: ( Member GalleyProvider r, Member TinyLog r, Member (Embed HttpClientIO) r, - Member NotificationSubsystem r + Member NotificationSubsystem r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => ActivationTarget -> ActivationCode -> @@ -993,7 +1035,10 @@ activateWithCurrency :: ( Member GalleyProvider r, Member TinyLog r, Member (Embed HttpClientIO) r, - Member NotificationSubsystem r + Member NotificationSubsystem r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => ActivationTarget -> ActivationCode -> @@ -1037,7 +1082,10 @@ preverify tgt code = do onActivated :: ( Member TinyLog r, Member (Embed HttpClientIO) r, - Member NotificationSubsystem r + Member NotificationSubsystem r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => ActivationEvent -> (AppT r) (UserId, Maybe UserIdentity, Bool) @@ -1265,7 +1313,10 @@ deleteSelfUser :: ( Member GalleyProvider r, Member TinyLog r, Member (Embed HttpClientIO) r, - Member NotificationSubsystem r + Member NotificationSubsystem r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => UserId -> Maybe PlainTextPassword6 -> @@ -1346,7 +1397,10 @@ deleteSelfUser uid pwd = do verifyDeleteUser :: ( Member (Embed HttpClientIO) r, Member NotificationSubsystem r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => VerifyDeleteUser -> ExceptT DeleteUserError (AppT r) () @@ -1364,7 +1418,10 @@ verifyDeleteUser d = do ensureAccountDeleted :: ( Member (Embed HttpClientIO) r, Member NotificationSubsystem r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => UserId -> AppT r DeleteUserResult @@ -1404,7 +1461,10 @@ ensureAccountDeleted uid = do deleteAccount :: ( Member (Embed HttpClientIO) r, Member NotificationSubsystem r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => UserAccount -> Sem r () diff --git a/services/brig/src/Brig/App.hs b/services/brig/src/Brig/App.hs index 80c938c5d06..1b41a473806 100644 --- a/services/brig/src/Brig/App.hs +++ b/services/brig/src/Brig/App.hs @@ -70,6 +70,7 @@ module Brig.App AppT (..), viewFederationDomain, qualifyLocal, + qualifyLocal', -- * Crutches that should be removed once Brig has been completely @@ -139,6 +140,7 @@ import OpenSSL.Session (SSLOption (..)) import OpenSSL.Session qualified as SSL import Polysemy import Polysemy.Final +import Polysemy.Input (Input, input) import Ropes.Nexmo qualified as Nexmo import Ropes.Twilio qualified as Twilio import Ssl.Util @@ -599,3 +601,6 @@ viewFederationDomain = view (settings . Opt.federationDomain) qualifyLocal :: (MonadReader Env m) => a -> m (Local a) qualifyLocal a = toLocalUnsafe <$> viewFederationDomain <*> pure a + +qualifyLocal' :: (Member (Input (Local ()))) r => a -> Sem r (Local a) +qualifyLocal' a = flip toLocalUnsafe a . tDomain <$> input diff --git a/services/brig/src/Brig/CanonicalInterpreter.hs b/services/brig/src/Brig/CanonicalInterpreter.hs index c76802a40ae..9a77bbae6dc 100644 --- a/services/brig/src/Brig/CanonicalInterpreter.hs +++ b/services/brig/src/Brig/CanonicalInterpreter.hs @@ -7,6 +7,8 @@ import Brig.Effects.BlacklistStore (BlacklistStore) import Brig.Effects.BlacklistStore.Cassandra (interpretBlacklistStoreToCassandra) import Brig.Effects.CodeStore (CodeStore) import Brig.Effects.CodeStore.Cassandra (codeStoreToCassandra, interpretClientToIO) +import Brig.Effects.ConnectionStore (ConnectionStore) +import Brig.Effects.ConnectionStore.Cassandra (connectionStoreToCassandra) import Brig.Effects.FederationConfigStore (FederationConfigStore) import Brig.Effects.FederationConfigStore.Cassandra (interpretFederationDomainConfig, remotesMapFromCfgFile) import Brig.Effects.GalleyProvider (GalleyProvider) @@ -18,16 +20,20 @@ import Brig.Effects.PublicKeyBundle import Brig.Effects.UserPendingActivationStore (UserPendingActivationStore) import Brig.Effects.UserPendingActivationStore.Cassandra (userPendingActivationStoreToCassandra) import Brig.Options (ImplicitNoFederationRestriction (federationDomainConfig), federationDomainConfigs, federationStrategy) +import Brig.Options qualified as Opt import Brig.RPC (ParseException) import Cassandra qualified as Cas import Control.Lens ((^.)) import Control.Monad.Catch (throwM) +import Data.Qualified (Local, toLocalUnsafe) +import Data.Time.Clock (UTCTime, getCurrentTime) import Imports -import Polysemy (Embed, Final, embedToFinal, runFinal) +import Polysemy (Embed, Final, embed, embedToFinal, runFinal) import Polysemy.Async import Polysemy.Conc import Polysemy.Embed (runEmbedded) import Polysemy.Error (Error, mapError, runError) +import Polysemy.Input (Input, runInputConst, runInputSem) import Polysemy.TinyLog (TinyLog) import Wire.GundeckAPIAccess import Wire.NotificationSubsystem @@ -43,7 +49,10 @@ import Wire.Sem.Now.IO (nowToIOAction) import Wire.Sem.Paging.Cassandra (InternalPaging) type BrigCanonicalEffects = - '[ NotificationSubsystem, + '[ ConnectionStore InternalPaging, + Input UTCTime, + Input (Local ()), + NotificationSubsystem, GundeckAPIAccess, FederationConfigStore, Jwk, @@ -98,6 +107,9 @@ runBrigToIO e (AppT ma) = do . interpretFederationDomainConfig (e ^. settings . federationStrategy) (foldMap (remotesMapFromCfgFile . fmap (.federationDomainConfig)) (e ^. settings . federationDomainConfigs)) . runGundeckAPIAccess (e ^. gundeckEndpoint) . runNotificationSubsystemGundeck (defaultNotificationSubsystemConfig (e ^. requestId)) + . runInputConst (toLocalUnsafe (e ^. settings . Opt.federationDomain) ()) + . runInputSem (embed getCurrentTime) + . connectionStoreToCassandra ) ) $ runReaderT ma e diff --git a/services/brig/src/Brig/Data/Connection.hs b/services/brig/src/Brig/Data/Connection.hs index 16031d654eb..b624a7d9447 100644 --- a/services/brig/src/Brig/Data/Connection.hs +++ b/services/brig/src/Brig/Data/Connection.hs @@ -34,6 +34,7 @@ module Brig.Data.Connection lookupRemoteConnectionStatuses, lookupAllStatuses, lookupRemoteConnectedUsersC, + lookupRemoteConnectedUsersPaginated, countConnections, deleteConnections, deleteRemoteConnections, @@ -44,7 +45,6 @@ module Brig.Data.Connection remoteConnectionDelete, remoteConnectionSelectFromDomain, remoteConnectionClear, - remoteConnectionsSelectUsers, -- * Re-exports module T, @@ -268,10 +268,14 @@ lookupAllStatuses lfroms = do map (\(d, u, r) -> toConnectionStatusV2 from d u r) <$> retry x1 (query remoteRelationsSelectAll (params LocalQuorum (Identity from))) -lookupRemoteConnectedUsersC :: forall m. (MonadClient m) => UserId -> Int32 -> ConduitT () [Remote UserId] m () +lookupRemoteConnectedUsersC :: forall m. (MonadClient m) => Local UserId -> Int32 -> ConduitT () [Remote UserConnection] m () lookupRemoteConnectedUsersC u maxResults = - paginateC remoteConnectionsSelectUsers (paramsP LocalQuorum (Identity u) maxResults) x1 - .| C.map (map (uncurry toRemoteUnsafe)) + paginateC remoteConnectionSelect (paramsP LocalQuorum (Identity (tUnqualified u)) maxResults) x1 + .| C.map (\xs -> map (\x@(d, _, _, _, _, _) -> toRemoteUnsafe d (toRemoteUserConnection u x)) xs) + +lookupRemoteConnectedUsersPaginated :: MonadClient m => Local UserId -> Int32 -> m (Page (Remote UserConnection)) +lookupRemoteConnectedUsersPaginated u maxResults = do + (\x@(d, _, _, _, _, _) -> toRemoteUnsafe d (toRemoteUserConnection u x)) <$$> retry x1 (paginate remoteConnectionSelect (paramsP LocalQuorum (Identity (tUnqualified u)) maxResults)) -- | See 'lookupContactListWithRelation'. lookupContactList :: (MonadClient m) => UserId -> m [UserId] @@ -411,9 +415,6 @@ remoteRelationsSelect = "SELECT right_user, status FROM connection_remote WHERE remoteRelationsSelectAll :: PrepQuery R (Identity UserId) (Domain, UserId, RelationWithHistory) remoteRelationsSelectAll = "SELECT right_domain, right_user, status FROM connection_remote WHERE left = ?" -remoteConnectionsSelectUsers :: PrepQuery R (Identity UserId) (Domain, UserId) -remoteConnectionsSelectUsers = "SELECT right_domain, right_user FROM connection_remote WHERE left = ?" - -- Conversions toLocalUserConnection :: diff --git a/services/brig/src/Brig/Effects/ConnectionStore.hs b/services/brig/src/Brig/Effects/ConnectionStore.hs new file mode 100644 index 00000000000..013232d2686 --- /dev/null +++ b/services/brig/src/Brig/Effects/ConnectionStore.hs @@ -0,0 +1,35 @@ +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2024 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . +{-# LANGUAGE TemplateHaskell #-} + +module Brig.Effects.ConnectionStore where + +import Data.Id +import Data.Qualified (Local, Remote) +import Imports +import Polysemy +import Wire.API.Connection (UserConnection) +import Wire.Sem.Paging (Page, PagingBounds, PagingState) + +data ConnectionStore p m a where + RemoteConnectedUsersPaginated :: + Local UserId -> + Maybe (PagingState p (Remote UserConnection)) -> + PagingBounds p (Remote UserConnection) -> + ConnectionStore p m (Page p (Remote UserConnection)) + +makeSem ''ConnectionStore diff --git a/services/brig/src/Brig/Effects/ConnectionStore/Cassandra.hs b/services/brig/src/Brig/Effects/ConnectionStore/Cassandra.hs new file mode 100644 index 00000000000..35f2444ab88 --- /dev/null +++ b/services/brig/src/Brig/Effects/ConnectionStore/Cassandra.hs @@ -0,0 +1,41 @@ +{-# LANGUAGE DeepSubsumption #-} + +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2024 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Brig.Effects.ConnectionStore.Cassandra where + +import Brig.Data.Connection +import Brig.Effects.ConnectionStore +import Cassandra +import Data.Range +import Imports +import Polysemy +import Polysemy.Internal.Tactics +import Wire.Sem.Paging.Cassandra + +connectionStoreToCassandra :: + forall r a. + (Member (Embed Client) r) => + Sem (ConnectionStore InternalPaging ': r) a -> + Sem r a +connectionStoreToCassandra = + interpretH $ + liftT . embed @Client . \case + RemoteConnectedUsersPaginated uid mps bounds -> case mps of + Nothing -> flip mkInternalPage pure =<< lookupRemoteConnectedUsersPaginated uid (fromRange bounds) + Just ps -> ipNext ps diff --git a/services/brig/src/Brig/IO/Intra.hs b/services/brig/src/Brig/IO/Intra.hs index 61dc0c3e272..f81fd20f7a7 100644 --- a/services/brig/src/Brig/IO/Intra.hs +++ b/services/brig/src/Brig/IO/Intra.hs @@ -52,16 +52,16 @@ import Brig.API.Error (internalServerError) import Brig.API.Types import Brig.API.Util import Brig.App -import Brig.Data.Connection (lookupContactList) +import Brig.Data.Connection import Brig.Data.Connection qualified as Data -import Brig.Federation.Client (notifyUserDeleted) +import Brig.Effects.ConnectionStore (ConnectionStore) +import Brig.Effects.ConnectionStore qualified as E +import Brig.Federation.Client (notifyUserDeleted, sendConnectionAction) import Brig.IO.Journal qualified as Journal import Brig.RPC import Brig.Types.User.Event import Brig.User.Search.Index qualified as Search -import Cassandra (MonadClient) -import Conduit (runConduit, (.|)) -import Control.Error (ExceptT) +import Control.Error (ExceptT, runExceptT) import Control.Lens (view, (.~), (?~), (^.), (^?)) import Control.Monad.Catch import Control.Monad.Trans.Except (throwE) @@ -70,23 +70,22 @@ import Data.Aeson.KeyMap qualified as KeyMap import Data.Aeson.Lens import Data.ByteString.Conversion import Data.ByteString.Lazy qualified as BL -import Data.Conduit.List qualified as C import Data.Id -import Data.Json.Util ((#)) +import Data.Json.Util (toUTCTimeMillis, (#)) import Data.List.NonEmpty (NonEmpty (..)) import Data.List1 (List1, singleton) import Data.Proxy import Data.Qualified import Data.Range -import GHC.TypeLits +import Data.Time.Clock (UTCTime) import Gundeck.Types.Push.V2 (RecipientClients (RecipientClientsAll)) import Gundeck.Types.Push.V2 qualified as V2 import Imports import Network.HTTP.Types.Method import Network.HTTP.Types.Status import Polysemy +import Polysemy.Input (Input, input) import Polysemy.TinyLog (TinyLog) -import System.Logger.Class (MonadLogger) import System.Logger.Message hiding ((.=)) import Wire.API.Connection import Wire.API.Conversation hiding (Member) @@ -103,6 +102,8 @@ import Wire.API.User.Client import Wire.NotificationSubsystem import Wire.Rpc import Wire.Sem.Logger qualified as Log +import Wire.Sem.Paging qualified as P +import Wire.Sem.Paging.Cassandra (InternalPaging) ----------------------------------------------------------------------------- -- Event Handlers @@ -110,7 +111,10 @@ import Wire.Sem.Logger qualified as Log onUserEvent :: ( Member (Embed HttpClientIO) r, Member NotificationSubsystem r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => UserId -> Maybe ConnId -> @@ -228,7 +232,10 @@ journalEvent orig e = case e of dispatchNotifications :: ( Member (Embed HttpClientIO) r, Member NotificationSubsystem r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => UserId -> Maybe ConnId -> @@ -252,49 +259,103 @@ dispatchNotifications orig conn e = case e of -- n.b. Synchronously fetch the contact list on the current thread. -- If done asynchronously, the connections may already have been deleted. notifyUserDeletionLocals orig conn event - embed $ notifyUserDeletionRemotes orig + notifyUserDeletionRemotes orig where event = singleton $ UserEvent e notifyUserDeletionLocals :: + forall r. ( Member (Embed HttpClientIO) r, - Member NotificationSubsystem r + Member NotificationSubsystem r, + Member (Input (Local ())) r, + Member (Input UTCTime) r ) => UserId -> Maybe ConnId -> List1 Event -> Sem r () notifyUserDeletionLocals deleted conn event = do - recipients <- (:|) deleted <$> embed (lookupContactList deleted) - notify event deleted V2.RouteDirect conn (pure recipients) + luid <- qualifyLocal' deleted + -- first we send a notification to the deleted user's devices + notify event deleted V2.RouteDirect conn (pure (deleted :| [])) + -- then to all their connections + connectionPages Nothing luid (toRange (Proxy @500)) + where + handler :: [UserConnection] -> Sem r () + handler connections = do + -- sent event to connections that are accepted + case qUnqualified . ucTo <$> filter ((==) Accepted . ucStatus) connections of + x : xs -> notify event deleted V2.RouteDirect conn (pure (x :| xs)) + [] -> pure () + -- also send a connection cancelled event to connections that are pending + d <- tDomain <$> input + forM_ + (filter ((==) Sent . ucStatus) connections) + ( \uc -> do + now <- toUTCTimeMillis <$> input + -- because the connections are going to be removed from the database anyway when a user gets deleted + -- we don't need to save the updated connection state in the database + -- note that we switch from and to users so that the "other" user becomes the recipient of the event + let ucCancelled = + UserConnection + (qUnqualified (ucTo uc)) + (Qualified (ucFrom uc) d) + Cancelled + now + (ucConvId uc) + let e = ConnectionUpdated ucCancelled Nothing Nothing + onConnectionEvent deleted conn e + ) + + connectionPages :: Maybe UserId -> Local UserId -> Range 1 500 Int32 -> Sem r () + connectionPages mbStart user pageSize = do + page <- embed $ Data.lookupLocalConnections user mbStart pageSize + case resultList page of + [] -> pure () + xs -> do + handler xs + when (Data.resultHasMore page) $ + connectionPages (Just (maximum (qUnqualified . ucTo <$> xs))) user pageSize notifyUserDeletionRemotes :: - forall m. - ( MonadReader Env m, - MonadClient m, - MonadLogger m, - MonadMask m + forall r. + ( Member (Embed HttpClientIO) r, + Member TinyLog r, + Member (Input (Local ())) r, + Member (ConnectionStore InternalPaging) r ) => UserId -> - m () + Sem r () notifyUserDeletionRemotes deleted = do - runConduit $ - Data.lookupRemoteConnectedUsersC deleted (fromInteger (natVal (Proxy @UserDeletedNotificationMaxConnections))) - .| C.mapM_ fanoutNotifications + luid <- qualifyLocal' deleted + P.withChunks (\mps -> E.remoteConnectedUsersPaginated luid mps maxBound) fanoutNotifications where - fanoutNotifications :: [Remote UserId] -> m () + fanoutNotifications :: [Remote UserConnection] -> Sem r () fanoutNotifications = mapM_ notifyBackend . bucketRemote - notifyBackend :: Remote [UserId] -> m () - notifyBackend uids = do - case tUnqualified (checked <$> uids) of + notifyBackend :: Remote [UserConnection] -> Sem r () + notifyBackend ucs = do + case tUnqualified (checked <$> ucs) of Nothing -> -- The user IDs cannot be more than 1000, so we can assume the range -- check will only fail because there are 0 User Ids. pure () - Just rangedUids -> do - luidDeleted <- qualifyLocal deleted - notifyUserDeleted luidDeleted (qualifyAs uids rangedUids) + Just rangedUcs -> do + luidDeleted <- qualifyLocal' deleted + embed $ notifyUserDeleted luidDeleted (qualifyAs ucs ((fmap (fmap (qUnqualified . ucTo))) rangedUcs)) + -- also sent connection cancelled events to the connections that are pending + let remotePendingConnections = qualifyAs ucs <$> filter ((==) Sent . ucStatus) (fromRange rangedUcs) + forM_ remotePendingConnections $ sendCancelledEvent luidDeleted + + sendCancelledEvent :: Local UserId -> Remote UserConnection -> Sem r () + sendCancelledEvent luidDeleted ruc = do + embed (runExceptT (sendConnectionAction luidDeleted Nothing (qUnqualified . ucTo <$> ruc) RemoteRescind)) >>= \case + -- should we abort the whole process if we fail to send the event to a remote backend? + Left e -> + Log.err $ + field "error" (show e) + . msg (val "An error occurred while sending a connection cancelled event to a remote backend.") + Right _ -> pure () -- | (Asynchronously) notifies other users of events. notify :: diff --git a/services/brig/src/Brig/InternalEvent/Process.hs b/services/brig/src/Brig/InternalEvent/Process.hs index 9c04f5c3083..9bca2320e37 100644 --- a/services/brig/src/Brig/InternalEvent/Process.hs +++ b/services/brig/src/Brig/InternalEvent/Process.hs @@ -22,6 +22,7 @@ where import Brig.API.User qualified as API import Brig.App +import Brig.Effects.ConnectionStore import Brig.IO.Intra (rmClient) import Brig.IO.Intra qualified as Intra import Brig.InternalEvent.Types @@ -31,14 +32,18 @@ import Brig.Types.User.Event import Control.Lens (view) import Control.Monad.Catch import Data.ByteString.Conversion +import Data.Qualified (Local) +import Data.Time.Clock (UTCTime) import Imports import Polysemy import Polysemy.Conc +import Polysemy.Input (Input) import Polysemy.Time import Polysemy.TinyLog as Log import System.Logger.Class (field, msg, val, (~~)) import Wire.NotificationSubsystem import Wire.Sem.Delay +import Wire.Sem.Paging.Cassandra (InternalPaging) -- | Handle an internal event. -- @@ -48,7 +53,10 @@ onEvent :: Member NotificationSubsystem r, Member TinyLog r, Member Delay r, - Member Race r + Member Race r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => InternalNotification -> Sem r () diff --git a/services/brig/src/Brig/Team/API.hs b/services/brig/src/Brig/Team/API.hs index ac70c9623a1..a163240fd13 100644 --- a/services/brig/src/Brig/Team/API.hs +++ b/services/brig/src/Brig/Team/API.hs @@ -36,6 +36,7 @@ import Brig.Data.UserKey import Brig.Data.UserKey qualified as Data import Brig.Effects.BlacklistStore (BlacklistStore) import Brig.Effects.BlacklistStore qualified as BlacklistStore +import Brig.Effects.ConnectionStore (ConnectionStore) import Brig.Effects.GalleyProvider (GalleyProvider) import Brig.Effects.GalleyProvider qualified as GalleyProvider import Brig.Effects.UserPendingActivationStore (UserPendingActivationStore) @@ -53,11 +54,14 @@ import Control.Monad.Trans.Except (mapExceptT) import Data.ByteString.Conversion (toByteString, toByteString') import Data.Id import Data.List1 qualified as List1 +import Data.Qualified (Local) import Data.Range +import Data.Time.Clock (UTCTime) import Galley.Types.Teams qualified as Team import Imports hiding (head) import Network.Wai.Utilities hiding (code, message) import Polysemy +import Polysemy.Input (Input) import Polysemy.TinyLog (TinyLog) import Servant hiding (Handler, JSON, addHeader) import System.Logger.Class qualified as Log @@ -81,6 +85,7 @@ import Wire.API.User hiding (fromEmail) import Wire.API.User qualified as Public import Wire.NotificationSubsystem import Wire.Sem.Concurrency +import Wire.Sem.Paging.Cassandra (InternalPaging) servantAPI :: ( Member BlacklistStore r, @@ -312,7 +317,10 @@ suspendTeam :: Member NotificationSubsystem r, Member (Concurrency 'Unsafe) r, Member GalleyProvider r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => TeamId -> (Handler r) NoContent @@ -328,7 +336,10 @@ unsuspendTeam :: Member NotificationSubsystem r, Member (Concurrency 'Unsafe) r, Member GalleyProvider r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => TeamId -> (Handler r) NoContent @@ -345,7 +356,10 @@ changeTeamAccountStatuses :: Member NotificationSubsystem r, Member (Concurrency 'Unsafe) r, Member GalleyProvider r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => TeamId -> AccountStatus -> diff --git a/services/brig/src/Brig/User/Auth.hs b/services/brig/src/Brig/User/Auth.hs index fece7d7c22c..d329843c74f 100644 --- a/services/brig/src/Brig/User/Auth.hs +++ b/services/brig/src/Brig/User/Auth.hs @@ -47,6 +47,7 @@ import Brig.Data.LoginCode qualified as Data import Brig.Data.User qualified as Data import Brig.Data.UserKey import Brig.Data.UserKey qualified as Data +import Brig.Effects.ConnectionStore (ConnectionStore) import Brig.Effects.GalleyProvider (GalleyProvider) import Brig.Effects.GalleyProvider qualified as GalleyProvider import Brig.Email @@ -68,10 +69,13 @@ import Data.List.NonEmpty qualified as NE import Data.List1 (List1) import Data.List1 qualified as List1 import Data.Misc (PlainTextPassword6) +import Data.Qualified (Local) +import Data.Time.Clock (UTCTime) import Data.ZAuth.Token qualified as ZAuth import Imports import Network.Wai.Utilities.Error ((!>>)) import Polysemy +import Polysemy.Input (Input) import Polysemy.TinyLog (TinyLog) import Polysemy.TinyLog qualified as Log import System.Logger (field, msg, val, (~~)) @@ -82,6 +86,7 @@ import Wire.API.User.Auth import Wire.API.User.Auth.LegalHold import Wire.API.User.Auth.Sso import Wire.NotificationSubsystem +import Wire.Sem.Paging.Cassandra (InternalPaging) sendLoginCode :: (Member TinyLog r) => @@ -128,7 +133,10 @@ login :: ( Member GalleyProvider r, Member TinyLog r, Member (Embed HttpClientIO) r, - Member NotificationSubsystem r + Member NotificationSubsystem r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => Login -> CookieType -> @@ -237,7 +245,10 @@ renewAccess :: ( ZAuth.TokenPair u a, Member TinyLog r, Member (Embed HttpClientIO) r, - Member NotificationSubsystem r + Member NotificationSubsystem r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => List1 (ZAuth.Token u) -> Maybe (ZAuth.Token a) -> @@ -270,7 +281,10 @@ revokeAccess u pw cc ll = do catchSuspendInactiveUser :: ( Member (Embed HttpClientIO) r, Member NotificationSubsystem r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => UserId -> e -> @@ -296,7 +310,10 @@ newAccess :: ( ZAuth.TokenPair u a, Member TinyLog r, Member (Embed HttpClientIO) r, - Member NotificationSubsystem r + Member NotificationSubsystem r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => UserId -> Maybe ClientId -> @@ -407,7 +424,10 @@ validateToken ut at = do ssoLogin :: ( Member TinyLog r, Member (Embed HttpClientIO) r, - Member NotificationSubsystem r + Member NotificationSubsystem r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => SsoLogin -> CookieType -> @@ -431,7 +451,10 @@ legalHoldLogin :: ( Member GalleyProvider r, Member (Embed HttpClientIO) r, Member NotificationSubsystem r, - Member TinyLog r + Member TinyLog r, + Member (Input (Local ())) r, + Member (Input UTCTime) r, + Member (ConnectionStore InternalPaging) r ) => LegalHoldLogin -> CookieType ->