Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG-draft.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@ THIS FILE ACCUMULATES THE RELEASE NOTES FOR THE UPCOMING RELEASE.
## Internal changes

## Federation changes

* Ensure clients only receive messages meant for them in remote convs (#1739)
1 change: 1 addition & 0 deletions libs/tasty-cannon/package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ dependencies:
- aeson
- async
- base >=4.6 && <5
- bilge
- bytestring
- bytestring-conversion
- data-timeout
Expand Down
72 changes: 64 additions & 8 deletions libs/tasty-cannon/src/Test/Tasty/Cannon.hs
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,22 @@ module Test.Tasty.Cannon
-- * WebSockets
WebSocket,
connect,
connectAsClient,
close,
bracket,
bracketAsClient,
bracketN,
bracketAsClientN,

-- ** Random Connection IDs
connectR,
connectAsClientR,
bracketR,
bracketAsClientR,
bracketR2,
bracketR3,
bracketRN,
bracketAsClientRN,

-- * Awaiting & Asserting on Notifications
MatchTimeout (..),
Expand Down Expand Up @@ -63,6 +69,7 @@ module Test.Tasty.Cannon
)
where

import Bilge.Request (queryItem)
import Control.Concurrent.Async
import Control.Concurrent.Timeout hiding (threadDelay)
import Control.Exception (asyncExceptionFromException, throwIO)
Expand Down Expand Up @@ -96,10 +103,16 @@ data WebSocket = WebSocket
}

connect :: MonadIO m => Cannon -> UserId -> ConnId -> m WebSocket
connect can uid cid = liftIO $ do
connect can uid = connectAsMaybeClient can uid Nothing

connectAsClient :: MonadIO m => Cannon -> UserId -> ClientId -> ConnId -> m WebSocket
connectAsClient can uid client = connectAsMaybeClient can uid (Just client)

connectAsMaybeClient :: MonadIO m => Cannon -> UserId -> Maybe ClientId -> ConnId -> m WebSocket
connectAsMaybeClient can uid client conn = liftIO $ do
nchan <- newTChanIO
latch <- newEmptyMVar
wsapp <- run can uid cid (clientApp nchan latch)
wsapp <- run can uid client conn (clientApp nchan latch)
return $ WebSocket nchan latch wsapp

close :: MonadIO m => WebSocket -> m ()
Expand All @@ -114,7 +127,19 @@ bracket ::
ConnId ->
(WebSocket -> m a) ->
m a
bracket can uid cid = Catch.bracket (connect can uid cid) close
bracket can uid conn =
Catch.bracket (connect can uid conn) close

bracketAsClient ::
(MonadMask m, MonadIO m) =>
Cannon ->
UserId ->
ClientId ->
ConnId ->
(WebSocket -> m a) ->
m a
bracketAsClient can uid client conn =
Catch.bracket (connectAsClient can uid client conn) close

bracketN ::
(MonadIO m, MonadMask m) =>
Expand All @@ -127,16 +152,35 @@ bracketN c us f = go [] us
go wss [] = f (reverse wss)
go wss ((x, y) : xs) = bracket c x y (\ws -> go (ws : wss) xs)

bracketAsClientN ::
(MonadMask m, MonadIO m) =>
Cannon ->
[(UserId, ClientId, ConnId)] ->
([WebSocket] -> m a) ->
m a
bracketAsClientN c us f = go [] us
where
go wss [] = f (reverse wss)
go wss ((x, y, z) : xs) = bracketAsClient c x y z (\ws -> go (ws : wss) xs)

-- Random Connection IDs

connectR :: MonadIO m => Cannon -> UserId -> m WebSocket
connectR can uid = randomConnId >>= connect can uid

connectAsClientR :: MonadIO m => Cannon -> UserId -> ClientId -> m WebSocket
connectAsClientR can uid clientId = randomConnId >>= connectAsClient can uid clientId

bracketR :: (MonadIO m, MonadMask m) => Cannon -> UserId -> (WebSocket -> m a) -> m a
bracketR can usr f = do
cid <- randomConnId
bracket can usr cid f

bracketAsClientR :: (MonadIO m, MonadMask m) => Cannon -> UserId -> ClientId -> (WebSocket -> m a) -> m a
bracketAsClientR can usr clientId f = do
connId <- randomConnId
bracketAsClient can usr clientId connId f

bracketR2 ::
(MonadIO m, MonadMask m) =>
Cannon ->
Expand Down Expand Up @@ -174,6 +218,17 @@ bracketRN c us f = go [] us
go wss [] = f (reverse wss)
go wss (x : xs) = bracketR c x (\ws -> go (ws : wss) xs)

bracketAsClientRN ::
(MonadIO m, MonadMask m) =>
Cannon ->
[(UserId, ClientId)] ->
([WebSocket] -> m a) ->
m a
bracketAsClientRN can us f = go [] us
where
go wss [] = f (reverse wss)
go wss ((u, c) : xs) = bracketAsClientR can u c (\ws -> go (ws : wss) xs)

-----------------------------------------------------------------------------
-- Awaiting & Asserting on Notifications

Expand Down Expand Up @@ -336,8 +391,8 @@ randomConnId = liftIO $ do

-- | Start a client thread in 'Async' that opens a web socket to a Cannon, wait
-- for the connection to register with Gundeck, and return the 'Async' thread.
run :: MonadIO m => Cannon -> UserId -> ConnId -> WS.ClientApp () -> m (Async ())
run (($ Http.defaultRequest) -> ca) uid cid app = liftIO $ do
run :: MonadIO m => Cannon -> UserId -> Maybe ClientId -> ConnId -> WS.ClientApp () -> m (Async ())
run cannon@(($ Http.defaultRequest) -> ca) uid client connId app = liftIO $ do
latch <- newEmptyMVar
wsapp <-
async $
Expand All @@ -359,17 +414,18 @@ run (($ Http.defaultRequest) -> ca) uid cid app = liftIO $ do
where
caHost = C.unpack (Http.host ca)
caPort = Http.port ca
caPath = "/await" ++ C.unpack (Http.queryString ca)
caPath = "/await" ++ C.unpack caQuery
caQuery = Http.queryString . cannon . maybe id (queryItem "client" . toByteString') client $ Http.defaultRequest
caOpts = WS.defaultConnectionOptions
caHdrs = [("Z-User", toByteString' uid), ("Z-Connection", toByteString' cid)]
caHdrs = [("Z-User", toByteString' uid), ("Z-Connection", toByteString' connId)]
numRetries = 30
waitForRegistry 0 = throwIO $ RegistrationTimeout numRetries
waitForRegistry (n :: Int) = do
man <- newManager defaultManagerSettings
let ca' =
ca
{ method = "HEAD",
path = "/i/presences/" <> toByteString' uid <> "/" <> toByteString' cid
path = "/i/presences/" <> toByteString' uid <> "/" <> toByteString' connId
}
res <- httpLbs ca' man
unless (responseStatus res == status200) $ do
Expand Down
3 changes: 2 additions & 1 deletion libs/tasty-cannon/tasty-cannon.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ cabal-version: 1.12
--
-- see: https://github.com/sol/hpack
--
-- hash: b7fca22ffa51fd956424d50af91793bd16e3d1b5170e6ccc48b26bf821793358
-- hash: 3e4d6b79f93c721b5df897b6653023feaa197910713bf3a2a759ea37ca05427f

name: tasty-cannon
version: 0.4.0
Expand All @@ -30,6 +30,7 @@ library
aeson
, async
, base >=4.6 && <5
, bilge
, bytestring
, bytestring-conversion
, data-timeout
Expand Down
63 changes: 56 additions & 7 deletions services/galley/src/Galley/API/Federation.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,37 @@
-- with this program. If not, see <https://www.gnu.org/licenses/>.
module Galley.API.Federation where

import Control.Lens (itraversed, (<.>))
import Control.Monad.Catch (throwM)
import Control.Monad.Except (runExceptT)
import Data.ByteString.Conversion (toByteString')
import Data.Containers.ListUtils (nubOrd)
import Data.Domain
import Data.Id (ConvId)
import Data.Id (ConvId, UserId)
import Data.Json.Util (Base64ByteString (..))
import Data.List1 (list1)
import qualified Data.Map as Map
import Data.Map.Lens (toMapOf)
import Data.Qualified (Qualified (..))
import qualified Data.Set as Set
import Data.Tagged
import qualified Data.Text.Lazy as LT
import Galley.API.Error (invalidPayload)
import qualified Galley.API.Mapping as Mapping
import Galley.API.Message (UserType (..), postQualifiedOtrMessage)
import Galley.API.Message (MessageMetadata (..), UserType (..), postQualifiedOtrMessage, sendLocalMessages)
import qualified Galley.API.Update as API
import Galley.API.Util (fromRegisterConversation, pushConversationEvent, viewFederationDomain)
import Galley.App (Galley)
import qualified Galley.Data as Data
import Galley.Types.Conversations.Members (InternalMember (..), LocalMember)
import Imports
import Servant (ServerT)
import Servant.API.Generic (ToServantApi)
import Servant.Server.Generic (genericServerT)
import Wire.API.Conversation.Member (OtherMember (..), memId)
import qualified System.Logger.Class as Log
import qualified Wire.API.Conversation as Public
import Wire.API.Conversation.Member (OtherMember (..))
import qualified Wire.API.Conversation.Role as Public
import Wire.API.Event.Conversation
import Wire.API.Federation.API.Galley
( ConversationMemberUpdate (..),
Expand All @@ -52,6 +61,7 @@ import Wire.API.Federation.API.Galley
)
import qualified Wire.API.Federation.API.Galley as FederationAPIGalley
import Wire.API.ServantProto (FromProto (..))
import Wire.API.User.Client (userClientMap)

federationSitemap :: ServerT (ToServantApi FederationAPIGalley.Api) Galley
federationSitemap =
Expand Down Expand Up @@ -83,7 +93,7 @@ registerConversation rc = do
(rcOrigUserId rc)
(rcTime rc)
(EdConversation c)
pushConversationEvent Nothing event [memId mem] []
pushConversationEvent Nothing event [Public.memId mem] []

getConversations :: GetConversationsRequest -> Galley GetConversationsResponse
getConversations (GetConversationsRequest qUid gcrConvIds) = do
Expand Down Expand Up @@ -144,10 +154,49 @@ leaveConversation requestingDomain lc = do
API.removeMemberFromLocalConv leaver Nothing (lcConvId lc) leaver

-- FUTUREWORK: report errors to the originating backend
-- FUTUREWORK: error handling for missing / mismatched clients
receiveMessage :: Domain -> RemoteMessage ConvId -> Galley ()
receiveMessage domain =
API.postRemoteToLocal
. fmap (Tagged . (`Qualified` domain))
receiveMessage domain rmUnqualified = do
let rm = fmap (Tagged . (`Qualified` domain)) rmUnqualified
let convId = unTagged $ rmConversation rm
msgMetadata =
MessageMetadata
{ mmNativePush = rmPush rm,
mmTransient = rmTransient rm,
mmNativePriority = rmPriority rm,
mmData = rmData rm
}
recipientMap = userClientMap $ rmRecipients rm
msgs = toMapOf (itraversed <.> itraversed) recipientMap
(members, allMembers) <- Data.filterRemoteConvMembers (Map.keys recipientMap) convId
unless allMembers $
Log.warn $
Log.field "conversation" (toByteString' (qUnqualified convId))
Log.~~ Log.field "domain" (toByteString' (qDomain convId))
Log.~~ Log.msg
( "Attempt to send remote message to local\
\ users not in the conversation" ::
ByteString
)
localMembers <- sequence $ Map.fromSet mkLocalMember (Set.fromList members)
void $ sendLocalMessages (rmTime rm) (rmSender rm) (rmSenderClient rm) Nothing convId localMembers msgMetadata msgs
where
-- FUTUREWORK: https://wearezeta.atlassian.net/browse/SQCORE-875
mkLocalMember :: UserId -> Galley LocalMember
mkLocalMember m =
pure $
InternalMember
{ memId = m,
memService = Nothing,
memOtrMuted = False,
memOtrMutedStatus = Nothing,
memOtrMutedRef = Nothing,
memOtrArchived = False,
memOtrArchivedRef = Nothing,
memHidden = False,
memHiddenRef = Nothing,
memConvRoleName = Public.roleNameWireMember
}

sendMessage :: Domain -> MessageSendRequest -> Galley MessageSendResponse
sendMessage originDomain msr = do
Expand Down
Loading