Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion services/cannon/src/Cannon/App.hs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
Expand Down Expand Up @@ -115,12 +116,26 @@ readLoop ws s = loop
send (connection ws) (pong p)
loop
ControlMessage (Close _ _) -> return ()
_ -> reset counter s 0 >> loop
perhapsPingMsg -> do
reset counter s 0
when (isAppLevelPing perhapsPingMsg) sendAppLevelPong
loop

adjustPingFreq p = case fromByteString (toStrict p) of
Just i | i > 0 && i < maxPingInterval -> reset pingFreq s (i # Second)
_ -> return ()

-- control messages are internal to the browser that manages the websockets
-- <https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API/Writing_WebSocket_servers#Pings_and_Pongs_The_Heartbeat_of_WebSockets>.
-- since the browser may silently lose a websocket connection, wire clients are allowed send
-- 'DataMessage' pings as well, and we respond with a 'DataMessage' pong to allow them to
-- reliably decide whether the connection is still alive.
isAppLevelPing = \case
(DataMessage _ _ _ (Text "ping" _)) -> True
(DataMessage _ _ _ (Binary "ping")) -> True
_ -> False
sendAppLevelPong = sendMsgIO "pong" ws

rejectOnError :: PendingConnection -> HandshakeException -> IO a
rejectOnError p x = do
let f lb mg = toStrict . encode $ Error status400 lb mg
Expand Down
7 changes: 6 additions & 1 deletion services/cannon/src/Cannon/WS.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ module Cannon.WS
, isRemoteRegistered
, registerRemote
, sendMsg
, sendMsgIO

, Clock
, mkClock
Expand Down Expand Up @@ -203,7 +204,11 @@ sendMsg :: L.ByteString -> Key -> Websocket -> WS ()
sendMsg m k c = do
let kb = key2bytes k
trace $ client kb . msg (val "sendMsg: \"" +++ L.take 128 m +++ val "...\"")
liftIO $ recoverAll retry3x $ const $ sendBinaryData (connection c) m
liftIO $ sendMsgIO m c

sendMsgIO :: L.ByteString -> Websocket -> IO ()
sendMsgIO m c = do
recoverAll retry3x $ const $ sendBinaryData (connection c) m

close :: Key -> Websocket -> WS ()
close k c = do
Expand Down
58 changes: 49 additions & 9 deletions services/gundeck/test/integration/API.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ module API (TestSetup(..), tests) where
import Bilge
import Bilge.Assert
import Control.Arrow ((&&&))
import Control.Concurrent.Async (Async, async, wait, forConcurrently_)
import Control.Lens ((.~), (^.), (^?), view, (<&>))
import Control.Concurrent.Async (Async, async, wait, concurrently_, forConcurrently_)
import Control.Lens ((.~), (^.), (^?), view, (<&>), _2, (%~))
import Control.Retry (retrying, constantDelay, limitRetries)
import Data.Aeson hiding (json)
import Data.Aeson.Lens
Expand Down Expand Up @@ -112,6 +112,10 @@ tests s = testGroup "Gundeck integration tests" [
[ test s "register a push token" $ testRegisterPushToken
, test s "unregister a push token" $ testUnregisterPushToken
],
testGroup "Websocket pingpong"
[ test s "pings produce pongs" $ testPingPong
, test s "non-pings are ignored" $ testNoPingNoPong
],
-- TODO: The following tests require (at the moment), the usage real AWS
-- services so they are kept in a separate group to simplify testing
testGroup "RealAWS"
Expand Down Expand Up @@ -719,6 +723,28 @@ testUnregisterPushToken g _ b _ = do
void $ retryWhileN 12 (not . null) (listPushTokens uid g)
unregisterPushToken uid (tkn^.token) g !!! const 404 === statusCode

testPingPong :: TestSignature ()
testPingPong gu ca _ _ = do
uid :: UserId <- randomId
connid :: ConnId <- randomConnId
[(_, [(chread, chwrite)] :: [(TChan ByteString, TChan ByteString)])]
<- connectUsersAndDevicesWithSendingClients gu ca [(uid, [connid])]
liftIO $ do
atomically $ writeTChan chwrite "ping"
msg <- waitForMessage chread
assertBool "no pong" $ msg == Just "pong"

testNoPingNoPong :: TestSignature ()
testNoPingNoPong gu ca _ _ = do
uid :: UserId <- randomId
connid :: ConnId <- randomConnId
[(_, [(chread, chwrite)] :: [(TChan ByteString, TChan ByteString)])]
<- connectUsersAndDevicesWithSendingClients gu ca [(uid, [connid])]
liftIO $ do
atomically $ writeTChan chwrite "Wire is so much nicer with internet!"
msg <- waitForMessage chread
assertBool "unexpected response on non-ping" $ isNothing msg

testSharePushToken :: TestSignature ()
testSharePushToken g _ b _ = do
gcmTok <- Token . T.decodeUtf8 . toByteString' <$> randomId
Expand Down Expand Up @@ -816,13 +842,25 @@ connectUser gu ca uid con = do
[(_, [ch])] <- connectUsersAndDevices gu ca [(uid, [con])]
return ch

connectUsersAndDevices :: HasCallStack => Gundeck -> Cannon -> [(UserId, [ConnId])] -> Http [(UserId, [TChan ByteString])]
connectUsersAndDevices gu ca uidsAndConnIds = do
connectUsersAndDevices
:: HasCallStack
=> Gundeck -> Cannon -> [(UserId, [ConnId])]
-> Http [(UserId, [TChan ByteString])]
connectUsersAndDevices gu ca uidsAndConnIds =
strip <$> connectUsersAndDevicesWithSendingClients gu ca uidsAndConnIds
where strip = fmap (_2 %~ fmap fst)

connectUsersAndDevicesWithSendingClients
:: HasCallStack
=> Gundeck -> Cannon -> [(UserId, [ConnId])]
-> Http [(UserId, [(TChan ByteString, TChan ByteString)])]
connectUsersAndDevicesWithSendingClients gu ca uidsAndConnIds = do
chs <- forM uidsAndConnIds $ \(uid, conns) -> (uid,) <$> do
forM conns $ \conn -> do
ch <- liftIO $ atomically newTChan
_ <- wsRun ca uid conn (wsReader ch)
pure ch
chread <- liftIO $ atomically newTChan
chwrite <- liftIO $ atomically newTChan
_ <- wsRun ca uid conn (wsReaderWriter chread chwrite)
pure (chread, chwrite)
(\(uid, conns) -> wsAssertPresences gu uid (length conns)) `mapM_` uidsAndConnIds
pure chs

Expand All @@ -849,8 +887,10 @@ wsAssertPresences gu uid numPres = do
wsCloser :: MVar () -> WS.ClientApp ()
wsCloser m conn = takeMVar m >> WS.sendClose conn C.empty >> putMVar m ()

wsReader :: TChan ByteString -> WS.ClientApp ()
wsReader ch conn = forever $ WS.receiveData conn >>= atomically . writeTChan ch
wsReaderWriter :: TChan ByteString -> TChan ByteString -> WS.ClientApp ()
wsReaderWriter chread chwrite conn = concurrently_
(forever $ WS.receiveData conn >>= atomically . writeTChan chread)
(forever $ WS.sendTextData conn =<< atomically (readTChan chwrite))

retryWhile :: (MonadIO m) => (a -> Bool) -> m a -> m a
retryWhile = retryWhileN 10
Expand Down