Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,10 @@ kube-integration-test:
kube-integration-teardown:
export NAMESPACE=$(NAMESPACE); ./hack/bin/integration-teardown-federation.sh

.PHONY: kube-integration-e2e-telepresence
kube-integration-e2e-telepresence:
./services/brig/federation-tests.sh $(NAMESPACE)

.PHONY: kube-integration-setup-sans-federation
kube-integration-setup-sans-federation: guard-tag charts-integration
# by default "test-<your computer username> is used as namespace
Expand Down
1 change: 1 addition & 0 deletions changelog.d/6-federation/notify-remotes
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Notify remote participants when a user leaves a conversation because they were deleted
12 changes: 11 additions & 1 deletion services/brig/federation-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,14 @@ kubectl -n "$NAMESPACE" get configmap brig -o jsonpath='{.data.brig\.yaml}' >b.y
sed -i "s=privateKeys: /etc/wire/brig/secrets/secretkey.txt=privateKeys: test/resources/zauth/privkeys.txt=g" b.yaml
sed -i "s=publicKeys: /etc/wire/brig/secrets/publickey.txt=publicKeys: test/resources/zauth/pubkeys.txt=g" b.yaml

telepresence --namespace "$NAMESPACE" --also-proxy cassandra-ephemeral --run bash -c "export INTEGRATION_FEDERATION_TESTS=1; ./dist/brig-integration -p federation-end2end-user -i i.yaml -s b.yaml"
# We need to pass --also-proxy to cannon pod IPs, as for some reason (maybe due
# to calico) the pod IPs in some clusters are not within the podCIDR range
# defined on the nodes and cannons need to be accessed directly (without using
# the kubernetes services)
declare -a alsoProxyOptions
while read -r ip; do
alsoProxyOptions+=("--also-proxy=${ip}")
done < <(kubectl get pods -n "$NAMESPACE" -l wireService=cannon -o json | jq -r '.items[].status.podIPs[].ip')

# shellcheck disable=SC2086
telepresence --namespace "$NAMESPACE" --also-proxy=cassandra-ephemeral ${alsoProxyOptions[*]} --run bash -c "export INTEGRATION_FEDERATION_TESTS=1; ./dist/brig-integration -p federation-end2end-user -i i.yaml -s b.yaml"
14 changes: 14 additions & 0 deletions services/brig/test/integration/API/User/Util.hs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ import qualified Data.Vector as Vec
import Federation.Util (withTempMockFederator)
import Gundeck.Types (Notification (..))
import Imports
import qualified Test.Tasty.Cannon as WS
import Test.Tasty.HUnit
import Util
import qualified Wire.API.Event.Conversation as Conv
import qualified Wire.API.Federation.API.Brig as F
import Wire.API.Federation.GRPC.Types hiding (body, path)
import qualified Wire.API.Federation.GRPC.Types as F
Expand Down Expand Up @@ -462,3 +464,15 @@ matchDeleteUserNotification quid n = do
etype @?= Just "user.delete"
eUnqualifiedId @?= Just (qUnqualified quid)
eQualifiedId @?= Just quid

matchConvLeaveNotification :: Qualified ConvId -> Qualified UserId -> [Qualified UserId] -> Notification -> IO ()
matchConvLeaveNotification conv remover removeds n = do
let e = List1.head (WS.unpackPayload n)
ntfTransient n @?= False
Conv.evtConv e @?= conv
Conv.evtType e @?= Conv.MemberLeave
Conv.evtFrom e @?= remover
sorted (Conv.evtData e) @?= sorted (Conv.EdMembersLeave (Conv.QualifiedUserIdList removeds))
where
sorted (Conv.EdMembersLeave (Conv.QualifiedUserIdList m)) = Conv.EdMembersLeave (Conv.QualifiedUserIdList (sort m))
sorted x = x
28 changes: 26 additions & 2 deletions services/brig/test/integration/Federation/End2end.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
module Federation.End2end where

import API.Search.Util
import API.User.Util (getUserClientsQualified)
import API.User.Util
import Bilge
import Bilge.Assert ((!!!), (<!!), (===))
import Brig.API.Client (pubClient)
Expand Down Expand Up @@ -98,7 +98,8 @@ spec _brigOpts mg brig galley cannon _federator brigTwo galleyTwo =
test mg "leave a remote conversation" $ leaveRemoteConversation brig galley brigTwo galleyTwo,
test mg "include remote users to new conversation" $ testRemoteUsersInNewConv brig galley brigTwo galleyTwo,
test mg "send a message to a remote user" $ testSendMessage brig brigTwo galleyTwo cannon,
test mg "send a message in a remote conversation" $ testSendMessageToRemoteConv brig brigTwo galley galleyTwo cannon
test mg "send a message in a remote conversation" $ testSendMessageToRemoteConv brig brigTwo galley galleyTwo cannon,
test mg "delete user connected to remotes and in conversation with remotes" $ testDeleteUser brig brigTwo galley galleyTwo cannon
]

-- | Path covered by this test:
Expand Down Expand Up @@ -595,3 +596,26 @@ testSendMessageToRemoteConv brig1 brig2 galley1 galley2 cannon1 = do
@?= EdOtrMessage
( OtrMessage bobClient aliceClient (toBase64Text msgText) (Just "")
)

testDeleteUser :: Brig -> Brig -> Galley -> Galley -> Cannon -> Http ()
testDeleteUser brig1 brig2 galley1 galley2 cannon1 = do
alice <- userQualifiedId <$> randomUser brig1
bobDel <- userQualifiedId <$> randomUser brig2

connectUsersEnd2End brig1 brig2 alice bobDel

conv1 <-
fmap cnvQualifiedId . responseJsonError
=<< createConversation galley1 (qUnqualified alice) [bobDel]
<!! const 201 === statusCode

conv2 <-
fmap cnvQualifiedId . responseJsonError
=<< createConversation galley2 (qUnqualified bobDel) [alice]
<!! const 201 === statusCode

WS.bracketR cannon1 (qUnqualified alice) $ \wsAlice -> do
deleteUser (qUnqualified bobDel) (Just defPassword) brig2 !!! const 200 === statusCode
WS.assertMatch_ (5 # Second) wsAlice $ matchDeleteUserNotification bobDel
WS.assertMatch_ (5 # Second) wsAlice $ matchConvLeaveNotification conv1 bobDel [bobDel]
WS.assertMatch_ (5 # Second) wsAlice $ matchConvLeaveNotification conv2 bobDel [bobDel]
66 changes: 47 additions & 19 deletions services/galley/src/Galley/API/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import Data.List1 (maybeList1)
import Data.Qualified
import Data.Range
import Data.String.Conversions (cs)
import qualified Data.Text as T
import Data.Time
import GHC.TypeLits (AppendSymbol)
import qualified Galley.API.Clients as Clients
Expand Down Expand Up @@ -87,9 +86,11 @@ import Servant.Server.Generic (genericServerT)
import System.Logger.Class hiding (Path, name)
import qualified System.Logger.Class as Log
import Wire.API.Conversation (ConvIdsPage, pattern GetPaginatedConversationIds)
import Wire.API.Conversation.Action (ConversationAction (ConversationActionRemoveMembers))
import Wire.API.ErrorDescription (MissingLegalholdConsent)
import Wire.API.Federation.API.Galley (UserDeletedConversationsNotification (UserDeletedConversationsNotification))
import Wire.API.Federation.API.Galley (ConversationUpdate (..), UserDeletedConversationsNotification (UserDeletedConversationsNotification))
import qualified Wire.API.Federation.API.Galley as FedGalley
import Wire.API.Federation.Client (FederationError)
import Wire.API.Routes.MultiTablePaging (mtpHasMore, mtpPagingState, mtpResults)
import Wire.API.Routes.MultiVerb (MultiVerb, RespondEmpty)
import Wire.API.Routes.Public (ZOptConn, ZUser)
Expand Down Expand Up @@ -533,50 +534,77 @@ rmUser user conn = do
leaveLocalConversations :: Member MemberStore r => [ConvId] -> Galley r ()
leaveLocalConversations ids = do
localDomain <- viewFederationDomain
let qUser = Qualified user localDomain
cc <- liftSem $ getConversations ids
now <- liftIO getCurrentTime
pp <- for cc $ \c -> case Data.convType c of
SelfConv -> return Nothing
One2OneConv -> liftSem $ deleteMembers (Data.convId c) (UserList [user] []) $> Nothing
ConnectConv -> liftSem $ deleteMembers (Data.convId c) (UserList [user] []) $> Nothing
RegularConv
| user `isMember` Data.convLocalMembers c -> do
liftSem $ deleteMembers (Data.convId c) (UserList [user] [])
now <- liftIO getCurrentTime
let e =
Event
MemberLeave
(Qualified (Data.convId c) localDomain)
(Qualified user localDomain)
now
(EdMembersLeave (QualifiedUserIdList [Qualified user localDomain]))
return $
(EdMembersLeave (QualifiedUserIdList [qUser]))
for_ (bucketRemote (fmap rmId (Data.convRemoteMembers c))) $ notifyRemoteMembers now qUser (Data.convId c)
pure $
Intra.newPushLocal ListComplete user (Intra.ConvEvent e) (Intra.recipient <$> Data.convLocalMembers c)
<&> set Intra.pushConn conn
. set Intra.pushRoute Intra.RouteDirect
| otherwise -> return Nothing

for_
(maybeList1 (catMaybes pp))
(liftSem . push)

-- FUTUREWORK: This could be optimized to reduce the number of RPCs
-- made. When a team is deleted the burst of RPCs created here could
-- lead to performance issues. We should cover this in a performance
-- test.
notifyRemoteMembers :: UTCTime -> Qualified UserId -> ConvId -> Remote [UserId] -> Galley r ()
notifyRemoteMembers now qUser cid remotes = do
localDomain <- viewFederationDomain
let convUpdate =
ConversationUpdate
{ cuTime = now,
cuOrigUserId = qUser,
cuConvId = cid,
cuAlreadyPresentUsers = tUnqualified remotes,
cuAction = ConversationActionRemoveMembers (pure qUser)
}
let rpc = FedGalley.onConversationUpdated FedGalley.clientRoutes localDomain convUpdate
liftSem (runFederatedEither remotes rpc)
>>= logAndIgnoreError "Error in onConversationUpdated call" (qUnqualified qUser)

leaveRemoteConversations :: Local UserId -> Range 1 FedGalley.UserDeletedNotificationMaxConvs [Remote ConvId] -> Galley r ()
leaveRemoteConversations lusr cids = do
for_ (bucketRemote (fromRange cids)) $ \remoteConvs -> do
let userDelete = UserDeletedConversationsNotification (tUnqualified lusr) (unsafeRange (tUnqualified remoteConvs))
let rpc = FedGalley.onUserDeleted FedGalley.clientRoutes (tDomain lusr) userDelete
res <- liftSem $ runFederatedEither remoteConvs rpc
case res of
-- FUTUREWORK: Add a retry mechanism if there are federation errrors.
-- See https://wearezeta.atlassian.net/browse/SQCORE-1091
Left federationError -> do
Log.err $
Log.msg $
T.unwords
[ "Federation error while notifying remote backends of a user deletion (Galley).",
"user_id: " <> (cs . show) lusr,
"details: " <> (cs . show) federationError
]
pure ()
Right _ -> pure ()
liftSem (runFederatedEither remoteConvs rpc)
>>= logAndIgnoreError "Error in onUserDeleted call" (tUnqualified lusr)

-- FUTUREWORK: Add a retry mechanism if there are federation errrors.
-- See https://wearezeta.atlassian.net/browse/SQCORE-1091
logAndIgnoreError :: Text -> UserId -> Either FederationError a -> Galley r ()
logAndIgnoreError message usr res = do
case res of
Left federationError -> do
Log.err
( Log.msg
( "Federation error while notifying remote backends of a user deletion (Galley). "
<> message
<> " "
<> (cs . show $ federationError)
)
. Log.field "user" (show usr)
)
Right _ -> pure ()

deleteLoop :: Galley r ()
deleteLoop = liftGalley0 $ do
Expand Down
52 changes: 42 additions & 10 deletions services/galley/test/integration/API.hs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ import Wire.API.Conversation.Action
import qualified Wire.API.Federation.API.Brig as FederatedBrig
import Wire.API.Federation.API.Galley
( Api (onConversationUpdated),
ConversationUpdate (cuAction, cuAlreadyPresentUsers, cuOrigUserId),
ConversationUpdate (cuAction, cuAlreadyPresentUsers, cuConvId, cuOrigUserId),
GetConversationsResponse (..),
RemoteConvMembers (..),
RemoteConversation (..),
Expand Down Expand Up @@ -3200,17 +3200,19 @@ removeUser = do
let [alice', alexDel', amy'] = qUnqualified <$> [alice, alexDel, amy]
let bDomain = Domain "b.example.com"
bart <- randomQualifiedId bDomain
berta <- randomQualifiedId bDomain
let cDomain = Domain "c.example.com"
carl <- randomQualifiedId cDomain

connectUsers alice' (list1 alexDel' [amy'])
connectWithRemoteUser alice' bart
connectWithRemoteUser alice' berta
connectWithRemoteUser alexDel' bart
connectWithRemoteUser alice' carl
connectWithRemoteUser alexDel' carl

convA1 <- decodeConvId <$> postConv alice' [alexDel'] (Just "gossip") [] Nothing Nothing
convA2 <- decodeConvId <$> postConv alice' [alexDel', amy'] (Just "gossip2") [] Nothing Nothing
convA2 <- decodeConvId <$> postConvWithRemoteUsers alice' defNewConv {newConvQualifiedUsers = [alexDel, amy, berta]}
convA3 <- decodeConvId <$> postConv alice' [amy'] (Just "gossip3") [] Nothing Nothing
convA4 <- decodeConvId <$> postConvWithRemoteUsers alice' defNewConv {newConvQualifiedUsers = [alexDel, bart, carl]}
convB1 <- randomId -- a remote conversation at 'bDomain' that Alice, AlexDel and Bart will be in
Expand Down Expand Up @@ -3238,35 +3240,65 @@ removeUser = do
FederatedGalley.onConversationCreated fedGalleyClient bDomain $ nc convB2 bart [alexDel]
FederatedGalley.onConversationCreated fedGalleyClient cDomain $ nc convC1 carl [alexDel]

localDomain <- viewFederationDomain

WS.bracketR3 c alice' alexDel' amy' $ \(wsAlice, wsAlexDel, wsAmy) -> do
let galleyApi _domain =
emptyFederatedGalley
{ FederatedGalley.leaveConversation = \_domain _update ->
pure (FederatedGalley.LeaveConversationResponse (Right ())),
FederatedGalley.onConversationUpdated = \_domain _convUpdate ->
pure ()
}
(_, fedRequests) <-
withTempMockFederator (const (FederatedGalley.LeaveConversationResponse (Right ()))) $
withTempServantMockFederator (const emptyFederatedBrig) galleyApi localDomain $
deleteUser alexDel' !!! const 200 === statusCode

-- FUTUTREWORK: There should be 4 requests, one to each domain for telling
-- them that alex left the conversation hosted locally. Add assertions for
-- that and implement it.
liftIO $ do
assertEqual ("expect exactly 2 federated requests in : " <> show fedRequests) 2 (length fedRequests)
bReq <- assertOne $ filter (\req -> F.domain req == domainText bDomain) fedRequests
cReq <- assertOne $ filter (\req -> F.domain req == domainText cDomain) fedRequests
assertEqual ("expect exactly 5 federated requests in : " <> show fedRequests) 5 (length fedRequests)

liftIO $ do
bReq <- assertOne $ filter (matchFedRequest bDomain "/federation/on-user-deleted/conversations") fedRequests
fmap F.component (F.request bReq) @?= Just F.Galley
fmap F.path (F.request bReq) @?= Just "/federation/on-user-deleted/conversations"
Just (Right udcnB) <- pure $ fmap (eitherDecode . LBS.fromStrict . F.body) (F.request bReq)
sort (fromRange (FederatedGalley.udcnConversations udcnB)) @?= sort [convB1, convB2]
FederatedGalley.udcnUser udcnB @?= qUnqualified alexDel

fmap F.component (F.request bReq) @?= Just F.Galley
liftIO $ do
cReq <- assertOne $ filter (matchFedRequest cDomain "/federation/on-user-deleted/conversations") fedRequests
fmap F.component (F.request cReq) @?= Just F.Galley
fmap F.path (F.request cReq) @?= Just "/federation/on-user-deleted/conversations"
Just (Right udcnC) <- pure $ fmap (eitherDecode . LBS.fromStrict . F.body) (F.request cReq)
sort (fromRange (FederatedGalley.udcnConversations udcnC)) @?= sort [convC1]
FederatedGalley.udcnUser udcnC @?= qUnqualified alexDel

liftIO $ do
WS.assertMatchN_ (5 # Second) [wsAlice, wsAlexDel] $
wsAssertMembersLeave qconvA1 alexDel [alexDel]
WS.assertMatchN_ (5 # Second) [wsAlice, wsAlexDel, wsAmy] $
wsAssertMembersLeave qconvA2 alexDel [alexDel]

liftIO $ do
let bConvUpdateRPCs = filter (matchFedRequest bDomain "/federation/on-conversation-updated") fedRequests
bConvUpdatesEither :: [Either String ConversationUpdate] <- eitherDecode . LBS.fromStrict . F.body <$$> mapM (assertJust . F.request) bConvUpdateRPCs
bConvUpdates <- mapM assertRight bConvUpdatesEither

bConvUpdatesA2 <- assertOne $ filter (\cu -> cuConvId cu == convA2) bConvUpdates
cuAction bConvUpdatesA2 @?= ConversationActionRemoveMembers (pure alexDel)
cuAlreadyPresentUsers bConvUpdatesA2 @?= [qUnqualified berta]

bConvUpdatesA4 <- assertOne $ filter (\cu -> cuConvId cu == convA4) bConvUpdates
cuAction bConvUpdatesA4 @?= ConversationActionRemoveMembers (pure alexDel)
cuAlreadyPresentUsers bConvUpdatesA4 @?= [qUnqualified bart]

liftIO $ do
cConvUpdateRPC <- assertOne $ filter (matchFedRequest cDomain "/federation/on-conversation-updated") fedRequests
Just (Right convUpdate) <- pure $ fmap (eitherDecode . LBS.fromStrict . F.body) (F.request cConvUpdateRPC)
cuConvId convUpdate @?= convA4
cuAction convUpdate @?= ConversationActionRemoveMembers (pure alexDel)
cuAlreadyPresentUsers convUpdate @?= [qUnqualified carl]

-- Check memberships
mems1 <- fmap cnvMembers . responseJsonError =<< getConv alice' convA1
mems2 <- fmap cnvMembers . responseJsonError =<< getConv alice' convA2
Expand Down
5 changes: 5 additions & 0 deletions services/galley/test/integration/API/Util.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2512,3 +2512,8 @@ generateRemoteAndConvIdWithDomain remoteDomain shouldBeLocal lUserId = do
if shouldBeLocal == isLocal
then pure (qTagUnsafe other, convId)
else generateRemoteAndConvIdWithDomain remoteDomain shouldBeLocal lUserId

matchFedRequest :: Domain -> ByteString -> FederatedRequest -> Bool
matchFedRequest domain reqpath req =
F.domain req == domainText domain
&& fmap F.path (F.request req) == Just reqpath