diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 303bb55be..f894d7c4e 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -173,7 +173,7 @@ import Data.List.NonEmpty (NonEmpty (..), (<|)) import qualified Data.List.NonEmpty as L import Data.Map.Strict (Map) import qualified Data.Map.Strict as M -import Data.Maybe (isJust, isNothing, listToMaybe) +import Data.Maybe (catMaybes, isJust, isNothing, listToMaybe) import Data.Set (Set) import qualified Data.Set as S import Data.Text (Text) @@ -683,14 +683,15 @@ smpClientDisconnected c@AgentClient {active, smpClients, smpProxiedRelays} tSess serverDown :: ([RcvQueue], [ConnId]) -> IO () serverDown (qs, conns) = whenM (readTVarIO active) $ do incClientStat' c userId client "DISCONNECT" "" - notifySub "" $ hostEvent' DISCONNECT client - unless (null conns) $ notifySub "" $ DOWN srv conns + notifySub c "" $ hostEvent' DISCONNECT client + unless (null conns) $ notifySub c "" $ DOWN srv conns unless (null qs) $ do atomically $ mapM_ (releaseGetLock c) qs runReaderT (resubscribeSMPSession c tSess) env - notifySub :: forall e. AEntityI e => ConnId -> AEvent e -> IO () - notifySub connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, AEvt (sAEntity @e) cmd) +{-# INLINE notifySub #-} +notifySub :: forall e m. (AEntityI e, MonadIO m) => AgentClient -> ConnId -> AEvent e -> m () +notifySub c connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, AEvt (sAEntity @e) cmd) resubscribeSMPSession :: AgentClient -> SMPTransportSession -> AM' () resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess = do @@ -722,26 +723,22 @@ resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess = do removeSessVar v tSess smpSubWorkers reconnectSMPClient :: AgentClient -> SMPTransportSession -> NonEmpty RcvQueue -> AM' () -reconnectSMPClient c tSess@(_, srv, _) qs = handleNotify $ do +reconnectSMPClient c tSess qs = handleNotify $ do cs <- readTVarIO $ RQ.getConnections $ activeSubs c (rs, sessId_) <- subscribeQueues c $ L.toList qs let (errs, okConns) = partitionEithers $ map (\(RcvQueue {connId}, r) -> bimap (connId,) (const connId) r) rs - conns = filter (`M.notMember` cs) okConns - unless (null conns) $ notifySub "" $ UP srv conns let (tempErrs, finalErrs) = partition (temporaryAgentError . snd) errs - mapM_ (\(connId, e) -> notifySub connId $ ERR e) finalErrs + mapM_ (\(connId, e) -> notifySub c connId $ ERR e) finalErrs forM_ (listToMaybe tempErrs) $ \(connId, e) -> do when (null okConns && M.null cs && null finalErrs) . liftIO $ forM_ sessId_ $ \sessId -> do -- We only close the client session that was used to subscribe. v_ <- atomically $ ifM (activeClientSession c tSess sessId) (TM.lookupDelete tSess $ smpClients c) (pure Nothing) mapM_ (closeClient_ c) v_ - notifySub connId $ ERR e + notifySub c connId $ ERR e where handleNotify :: AM' () -> AM' () - handleNotify = E.handleAny $ notifySub "" . ERR . INTERNAL . show - notifySub :: forall e. AEntityI e => ConnId -> AEvent e -> AM' () - notifySub connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, AEvt (sAEntity @e) cmd) + handleNotify = E.handleAny $ notifySub c "" . ERR . INTERNAL . show getNtfServerClient :: AgentClient -> NtfTransportSession -> AM NtfClient getNtfServerClient c@AgentClient {active, ntfClients, workerSeq} tSess@(userId, srv, _) = do @@ -1300,14 +1297,10 @@ newRcvQueue c userId connId (ProtoServerWithAuth srv auth) vRange subMode = do qUri = SMPQueueUri vRange $ SMPQueueAddress srv sndId e2eDhKey pure (rq, qUri, tSess, sessId) -processSubResult :: AgentClient -> RcvQueue -> Either SMPClientError () -> STM () +processSubResult :: AgentClient -> RcvQueue -> Either SMPClientError () -> STM (Maybe ConnId) processSubResult c rq@RcvQueue {connId} = \case - Left e -> - unless (temporaryClientError e) $ - failSubscription c rq e - Right () -> - whenM (hasPendingSubscription c connId) $ - addSubscription c rq + Left e -> Nothing <$ unless (temporaryClientError e) (failSubscription c rq e) + Right () -> ifM (hasPendingSubscription c connId) (Just connId <$ addSubscription c rq) (pure Nothing) temporaryAgentError :: AgentErrorType -> Bool temporaryAgentError = \case @@ -1359,23 +1352,27 @@ subscribeQueues c qs = do subscribeQueues_ :: Env -> TVar (Maybe SessionId) -> SMPClient -> NonEmpty RcvQueue -> IO (BatchResponses SMPClientError ()) subscribeQueues_ env session smp qs' = do rs <- sendBatch subscribeSMPQueues smp qs' - active <- + (active, subResults) <- atomically $ ifM (activeClientSession c tSess sessId) - (writeTVar session (Just sessId) >> processSubResults rs $> True) - (pure False) + (writeTVar session (Just sessId) >> ((True,) <$> processSubResults rs)) + (pure (False, [])) if active - then when (hasTempErrors rs) resubscribe $> rs + then do + when (any isNothing subResults) resubscribe + let up = catMaybes $ L.toList subResults + unless (null up) $ notifyUP up + pure rs else do logWarn "subcription batch result for replaced SMP client, resubscribing" resubscribe $> L.map (second $ \_ -> Left PCENetworkError) rs where - tSess = transportSession' smp + notifyUP up = maybe (logError "sndQ full" >> notifyUP up) pure =<< timeout 30000000 (notifySub c "" $ UP srv up) + tSess@(_, srv, _) = transportSession' smp sessId = sessionId $ thParams smp - hasTempErrors = any (either temporaryClientError (const False) . snd) - processSubResults :: NonEmpty (RcvQueue, Either SMPClientError ()) -> STM () - processSubResults = mapM_ $ uncurry $ processSubResult c + processSubResults :: NonEmpty (RcvQueue, Either SMPClientError ()) -> STM (NonEmpty (Maybe ConnId)) + processSubResults = mapM (uncurry $ processSubResult c) resubscribe = resubscribeSMPSession c tSess `runReaderT` env activeClientSession :: AgentClient -> SMPTransportSession -> SessionId -> STM Bool diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index d52c12877..051eb68f7 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -13,6 +13,7 @@ {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TypeApplications #-} +{-# OPTIONS_GHC -Wno-ambiguous-fields #-} {-# OPTIONS_GHC -Wno-orphans #-} module AgentTests.FunctionalAPITests @@ -38,6 +39,7 @@ module AgentTests.FunctionalAPITests rfGet, sfGet, nGet, + nGetUP, (##>), (=##>), pattern CON, @@ -120,7 +122,7 @@ a ##> t = a `shouldRespond` t a =##> p = withTimeout a $ \r -> do unless (p r) $ liftIO $ putStrLn $ "value failed predicate: " <> show r - r `shouldSatisfy` p + withFrozenCallStack $ r `shouldSatisfy` p withTimeout :: (HasCallStack, MonadUnliftIO m) => m a -> (HasCallStack => a -> Expectation) -> m () withTimeout a test = @@ -140,6 +142,13 @@ sfGet c = withFrozenCallStack $ get' @'AESndFile c nGet :: (MonadIO m, HasCallStack) => AgentClient -> m (AEntityTransmission 'AENone) nGet c = withFrozenCallStack $ get' @'AENone c +nGetUP :: (MonadIO m, HasCallStack) => AgentClient -> m (AEntityTransmission 'AENone) +nGetUP c = withFrozenCallStack $ liftIO $ do + timeout 15000000 (pGet' c True False) >>= \case + Just (corrId, connId, AEvt _ cmd@UP {}) -> pure (corrId, connId, cmd) + Just (_, _, AEvt _ cmd) -> error $ "unexpected command " <> show cmd + Nothing -> error "timed out waiting for UP" + get' :: forall e m. (MonadIO m, AEntityI e, HasCallStack) => AgentClient -> m (AEntityTransmission e) get' c = withFrozenCallStack $ do (corrId, connId, AEvt e cmd) <- pGet c @@ -148,18 +157,19 @@ get' c = withFrozenCallStack $ do _ -> error $ "unexpected command " <> show cmd pGet :: forall m. MonadIO m => AgentClient -> m ATransmission -pGet c = pGet' c True +pGet c = pGet' c True True -pGet' :: forall m. MonadIO m => AgentClient -> Bool -> m ATransmission -pGet' c skipWarn = do +pGet' :: forall m. MonadIO m => AgentClient -> Bool -> Bool -> m ATransmission +pGet' c skipWarn skipUp = do t@(_, _, AEvt _ cmd) <- atomically (readTBQueue $ subQ c) case cmd of - CONNECT {} -> pGet c - DISCONNECT {} -> pGet c - ERR (BROKER _ NETWORK) -> pGet c - MWARN {} | skipWarn -> pGet c - RFWARN {} | skipWarn -> pGet c - SFWARN {} | skipWarn -> pGet c + CONNECT {} -> pGet' c skipWarn skipUp + DISCONNECT {} -> pGet' c skipWarn skipUp + ERR (BROKER _ NETWORK) -> pGet' c skipWarn skipUp + MWARN {} | skipWarn -> pGet' c skipWarn skipUp + RFWARN {} | skipWarn -> pGet' c skipWarn skipUp + SFWARN {} | skipWarn -> pGet' c skipWarn skipUp + UP {} | skipUp -> pGet' c skipWarn skipUp _ -> pure t pattern CONF :: ConfirmationId -> [SMPServer] -> ConnInfo -> AEvent e @@ -890,7 +900,7 @@ testAsyncServerOffline t = withAgentClients2 $ \alice bob -> do conns `shouldBe` [bobId] -- connection succeeds after server start withSmpServerStoreLogOn t testPort $ \_ -> runRight_ $ do - ("", "", UP srv1 conns1) <- nGet alice + ("", "", UP srv1 conns1) <- nGetUP alice liftIO $ do srv1 `shouldBe` testSMPServer conns1 `shouldBe` [bobId] @@ -930,8 +940,7 @@ testAllowConnectionClientRestart t = do withSmpServerConfigOn t cfg {storeLogFile = Just testStoreLogFile2} testPort2 $ \_ -> do runRight $ do - ("", "", UP _ _) <- nGet bob - + ("", "", UP _ _) <- nGetUP bob subscribeConnection alice2 bobId get alice2 ##> ("", bobId, CON) @@ -1078,7 +1087,7 @@ testDeliverClientRestart t = do withSmpServerStoreMsgLogOn t testPort $ \_ -> do runRight_ $ do - ("", "", UP _ _) <- nGet alice + ("", "", UP _ _) <- nGetUP alice subscribeConnection bob2 aliceId @@ -1201,8 +1210,8 @@ testDeliveryAfterSubscriptionError t = do Left (BROKER _ NETWORK) <- runExceptT $ subscribeConnection b aId pure () withSmpServerStoreLogOn t testPort $ \_ -> runRight $ do - withUP a bId $ \case ("", c, SENT 4) -> c == bId; _ -> False - withUP b aId $ \case ("", c, Msg "hello") -> c == aId; _ -> False + get a =##> \case ("", c, SENT 4) -> c == bId; _ -> False + get b =##> \case ("", c, Msg "hello") -> c == aId; _ -> False ackMessage b aId 4 Nothing testMsgDeliveryQuotaExceeded :: HasCallStack => ATransport -> IO () @@ -1214,7 +1223,7 @@ testMsgDeliveryQuotaExceeded t = mId <- sendMessage a bId SMP.noMsgFlags $ "message " <> bshow i get a =##> \case ("", c, SENT mId') -> bId == c && mId == mId'; _ -> False 8 <- sendMessage a bId SMP.noMsgFlags "over quota" - pGet' a False =##> \case ("", c, AEvt _ (MWARN 8 (SMP _ QUOTA))) -> bId == c; _ -> False + pGet' a False True =##> \case ("", c, AEvt _ (MWARN 8 (SMP _ QUOTA))) -> bId == c; _ -> False 4 <- sendMessage a bId' SMP.noMsgFlags "hello" get a =##> \case ("", c, SENT 4) -> bId' == c; _ -> False get b =##> \case ("", c, Msg "message 1") -> aId == c; _ -> False @@ -1245,8 +1254,8 @@ testExpireMessage t = 5 <- runRight $ sendMessage a bId SMP.noMsgFlags "2" -- this won't expire get a =##> \case ("", c, MERR 4 (BROKER _ e)) -> bId == c && (e == TIMEOUT || e == NETWORK); _ -> False withSmpServerStoreLogOn t testPort $ \_ -> runRight_ $ do - withUP a bId $ \case ("", _, SENT 5) -> True; _ -> False - withUP b aId $ \case ("", _, MsgErr 4 (MsgSkipped 3 3) "2") -> True; _ -> False + get a =##> \case ("", _, SENT 5) -> True; _ -> False + get b =##> \case ("", _, MsgErr 4 (MsgSkipped 3 3) "2") -> True; _ -> False ackMessage b aId 4 Nothing testExpireManyMessages :: HasCallStack => ATransport -> IO () @@ -1276,19 +1285,10 @@ testExpireManyMessages t = liftIO $ expected c e `shouldBe` True r -> error $ show r withSmpServerStoreLogOn t testPort $ \_ -> runRight_ $ do - withUP a bId $ \case ("", _, SENT 7) -> True; _ -> False - withUP b aId $ \case ("", _, MsgErr 4 (MsgSkipped 3 5) "4") -> True; _ -> False + get a =##> \case ("", _, SENT 7) -> True; _ -> False + get b =##> \case ("", _, MsgErr 4 (MsgSkipped 3 5) "4") -> True; _ -> False ackMessage b aId 4 Nothing -withUP :: AgentClient -> ConnId -> (AEntityTransmission 'AEConn -> Bool) -> ExceptT AgentErrorType IO () -withUP a bId p = - liftIO $ - getInAnyOrder - a - [ \case ("", "", AEvt SAENone (UP _ [c])) -> c == bId; _ -> False, - \case (corrId, c, AEvt SAEConn cmd) -> c == bId && p (corrId, c, cmd); _ -> False - ] - testExpireMessageQuota :: HasCallStack => ATransport -> IO () testExpireMessageQuota t = withSmpServerConfigOn t cfg {msgQueueQuota = 1} testPort $ \_ -> do a <- getSMPAgentClient' 1 agentCfg {quotaExceededTimeout = 1, messageRetryInterval = fastMessageRetryInterval} initAgentServers testDB @@ -1438,19 +1438,14 @@ testRatchetSyncServerOffline t = withAgentClients2 $ \alice bob -> do withSmpServerStoreMsgLogOn t testPort $ \_ -> do concurrently_ - (getInAnyOrder alice [ratchetSyncP' bobId RSAgreed, serverUpP]) - (getInAnyOrder bob2 [ratchetSyncP' aliceId RSAgreed, serverUpP]) + (pGet alice =##> ratchetSyncP' bobId RSAgreed) + (pGet bob2 =##> ratchetSyncP' aliceId RSAgreed) runRight_ $ do get alice =##> ratchetSyncP bobId RSOk get bob2 =##> ratchetSyncP aliceId RSOk exchangeGreetingsMsgIds alice bobId 12 bob2 aliceId 9 disposeAgentClient bob2 -serverUpP :: ATransmission -> Bool -serverUpP = \case - ("", "", AEvt SAENone (UP _ _)) -> True - _ -> False - testRatchetSyncClientRestart :: HasCallStack => ATransport -> IO () testRatchetSyncClientRestart t = do alice <- getSMPAgentClient' 1 agentCfg initAgentServers testDB @@ -1465,7 +1460,7 @@ testRatchetSyncClientRestart t = do bob3 <- getSMPAgentClient' 3 agentCfg initAgentServers testDB2 withSmpServerStoreMsgLogOn t testPort $ \_ -> do runRight_ $ do - ("", "", UP _ _) <- nGet alice + ("", "", UP _ _) <- nGetUP alice subscribeConnection bob3 aliceId get alice =##> ratchetSyncP bobId RSAgreed get bob3 =##> ratchetSyncP aliceId RSAgreed @@ -1495,12 +1490,11 @@ testRatchetSyncSuspendForeground t = do withSmpServerStoreMsgLogOn t testPort $ \_ -> do concurrently_ - (getInAnyOrder alice [ratchetSyncP' bobId RSAgreed, serverUpP]) - (getInAnyOrder bob2 [ratchetSyncP' aliceId RSAgreed, serverUpP]) - runRight_ $ do - get alice =##> ratchetSyncP bobId RSOk - get bob2 =##> ratchetSyncP aliceId RSOk - exchangeGreetingsMsgIds alice bobId 12 bob2 aliceId 9 + (get alice =##> ratchetSyncP bobId RSAgreed) + (get bob2 =##> ratchetSyncP aliceId RSAgreed) + get alice =##> ratchetSyncP bobId RSOk + get bob2 =##> ratchetSyncP aliceId RSOk + runRight_ $ exchangeGreetingsMsgIds alice bobId 12 bob2 aliceId 9 disposeAgentClient alice disposeAgentClient bob disposeAgentClient bob2 @@ -1523,12 +1517,11 @@ testRatchetSyncSimultaneous t = do withSmpServerStoreMsgLogOn t testPort $ \_ -> do concurrently_ - (getInAnyOrder alice [ratchetSyncP' bobId RSAgreed, serverUpP]) - (getInAnyOrder bob2 [ratchetSyncP' aliceId RSAgreed, serverUpP]) - runRight_ $ do - get alice =##> ratchetSyncP bobId RSOk - get bob2 =##> ratchetSyncP aliceId RSOk - exchangeGreetingsMsgIds alice bobId 12 bob2 aliceId 9 + (get alice =##> ratchetSyncP bobId RSAgreed) + (get bob2 =##> ratchetSyncP aliceId RSAgreed) + get alice =##> ratchetSyncP bobId RSOk + get bob2 =##> ratchetSyncP aliceId RSOk + runRight_ $ exchangeGreetingsMsgIds alice bobId 12 bob2 aliceId 9 disposeAgentClient alice disposeAgentClient bob disposeAgentClient bob2 @@ -1624,6 +1617,7 @@ testActiveClientNotDisconnected t = do where keepSubscribing :: AgentClient -> ConnId -> SystemTime -> ExceptT AgentErrorType IO () keepSubscribing alice connId ts = do + atomically $ void . flushTBQueue $ subQ alice -- drain queue so subscribeConnection may proceed with UPs ts' <- liftIO getSystemTime if milliseconds ts' - milliseconds ts < 2200 then do @@ -1674,13 +1668,11 @@ testSuspendingAgentCompleteSending t = withAgentClients2 $ \a b -> do liftIO $ suspendAgent b 5000000 withSmpServerStoreLogOn t testPort $ \_ -> runRight_ @AgentErrorType $ do - pGet b =##> \case ("", c, AEvt SAEConn (SENT 5)) -> c == aId; ("", "", AEvt _ UP {}) -> True; _ -> False - pGet b =##> \case ("", c, AEvt SAEConn (SENT 5)) -> c == aId; ("", "", AEvt _ UP {}) -> True; _ -> False - pGet b =##> \case ("", c, AEvt SAEConn (SENT 6)) -> c == aId; ("", "", AEvt _ UP {}) -> True; _ -> False + pGet b =##> \case ("", c, AEvt SAEConn (SENT 5)) -> c == aId; _ -> False + pGet b =##> \case ("", c, AEvt SAEConn (SENT 6)) -> c == aId; _ -> False ("", "", SUSPENDED) <- nGet b - pGet a =##> \case ("", c, AEvt _ (Msg "hello too")) -> c == bId; ("", "", AEvt _ UP {}) -> True; _ -> False - pGet a =##> \case ("", c, AEvt _ (Msg "hello too")) -> c == bId; ("", "", AEvt _ UP {}) -> True; _ -> False + pGet a =##> \case ("", c, AEvt _ (Msg "hello too")) -> c == bId; _ -> False ackMessage a bId 5 Nothing get a =##> \case ("", c, Msg "how are you?") -> c == bId; _ -> False ackMessage a bId 6 Nothing @@ -1706,7 +1698,7 @@ testSuspendingAgentTimeout t = withAgentClients2 $ \a b -> do testBatchedSubscriptions :: Int -> Int -> ATransport -> IO () testBatchedSubscriptions nCreate nDel t = - withAgentClientsCfgServers2 agentCfg agentCfg initAgentServers2 $ \a b -> do + withAgentClientsCfgServers2 agentCfgN agentCfgN initAgentServers2 $ \a b -> do conns <- runServers $ do conns <- replicateM nCreate $ makeConnection_ PQSupportOff a b forM_ conns $ \(aId, bId) -> exchangeGreetings_ PQEncOff a bId b aId @@ -1720,10 +1712,10 @@ testBatchedSubscriptions nCreate nDel t = ("", "", DOWN {}) <- nGet b ("", "", DOWN {}) <- nGet b runServers $ do - ("", "", UP {}) <- nGet a - ("", "", UP {}) <- nGet a - ("", "", UP {}) <- nGet b - ("", "", UP {}) <- nGet b + ("", "", UP {}) <- nGetUP a + ("", "", UP {}) <- nGetUP a + ("", "", UP {}) <- nGetUP b + ("", "", UP {}) <- nGetUP b liftIO $ threadDelay 1000000 let (aIds, bIds) = unzip conns conns' = drop nDel conns @@ -1739,6 +1731,8 @@ testBatchedSubscriptions nCreate nDel t = deleteFail a bIds' deleteFail b aIds' where + agentCfgN :: AgentConfig + agentCfgN = agentCfg {tbqSize = fromIntegral nCreate} -- without a reader thread subscriber would block on notifySub when the subQ gets full subscribe :: AgentClient -> [ConnId] -> ExceptT AgentErrorType IO () subscribe c cs = do r <- subscribeConnections c cs @@ -1967,13 +1961,7 @@ testWaitDelivery t = get alice ##> ("", bobId, SENT $ baseId + 3) get alice ##> ("", bobId, SENT $ baseId + 4) get alice =##> \case ("", cId, DEL_CONN) -> cId == bobId; _ -> False - - liftIO $ - getInAnyOrder - bob - [ \case ("", "", AEvt SAENone (UP _ [cId])) -> cId == aliceId; _ -> False, - \case ("", cId, AEvt SAEConn (Msg "how are you?")) -> cId == aliceId; _ -> False - ] + get bob =##> \case ("", cId, Msg "how are you?") -> cId == aliceId; _ -> False ackMessage bob aliceId (baseId + 3) Nothing get bob =##> \case ("", c, Msg "message 1") -> c == aliceId; _ -> False ackMessage bob aliceId (baseId + 4) Nothing @@ -2064,7 +2052,7 @@ testWaitDeliveryTimeout t = liftIO $ threadDelay 100000 withSmpServerStoreLogOn t testPort $ \_ -> do - nGet bob =##> \case ("", "", UP _ [cId]) -> cId == aliceId; _ -> False + nGetUP bob =##> \case ("", "", UP _ [cId]) -> cId == aliceId; _ -> False liftIO $ noMessages alice "nothing else should be delivered to alice" liftIO $ noMessages bob "nothing else should be delivered to bob" where @@ -2105,12 +2093,7 @@ testWaitDeliveryTimeout2 t = get alice ##> ("", bobId, SENT $ baseId + 3) -- "message 1" not delivered - liftIO $ - getInAnyOrder - bob - [ \case ("", "", AEvt SAENone (UP _ [cId])) -> cId == aliceId; _ -> False, - \case ("", cId, AEvt SAEConn (Msg "how are you?")) -> cId == aliceId; _ -> False - ] + get bob =##> \case ("", cId, Msg "how are you?") -> cId == aliceId; _ -> False liftIO $ noMessages alice "nothing else should be delivered to alice" liftIO $ noMessages bob "nothing else should be delivered to bob" where @@ -2134,14 +2117,8 @@ testJoinConnectionAsyncReplyError t = do withSmpServerOn t testPort2 $ do get b =##> \case ("2", c, OK) -> c == aId; _ -> False confId <- withSmpServerStoreLogOn t testPort $ \_ -> do - pGet a >>= \case - ("", "", AEvt _ (UP _ [_])) -> do - ("", _, CONF confId _ "bob's connInfo") <- get a - pure confId - ("", _, AEvt _ (CONF confId _ "bob's connInfo")) -> do - ("", "", UP _ [_]) <- nGet a - pure confId - r -> error $ "unexpected response " <> show r + ("", _, CONF confId _ "bob's connInfo") <- get a + pure confId nGet a =##> \case ("", "", DOWN _ [c]) -> c == bId; _ -> False runRight_ $ do allowConnectionAsync a "3" bId confId "alice's connInfo" @@ -2149,8 +2126,7 @@ testJoinConnectionAsyncReplyError t = do ConnectionStats {rcvQueuesInfo = [RcvQueueInfo {}], sndQueuesInfo = [SndQueueInfo {}]} <- getConnectionServers b aId pure () withSmpServerStoreLogOn t testPort $ \_ -> runRight_ $ do - pGet a =##> \case ("3", c, AEvt _ OK) -> c == bId; ("", "", AEvt _ (UP _ [c])) -> c == bId; _ -> False - pGet a =##> \case ("3", c, AEvt _ OK) -> c == bId; ("", "", AEvt _ (UP _ [c])) -> c == bId; _ -> False + pGet a =##> \case ("3", c, AEvt _ OK) -> c == bId; _ -> False get a ##> ("", bId, CON) get b ##> ("", aId, INFO "alice's connInfo") get b ##> ("", aId, CON) @@ -2202,8 +2178,8 @@ testUsersNoServer t = withAgentClientsCfg2 aCfg agentCfg $ \a b -> do nGet a =##> \case ("", "", DEL_USER u) -> u == auId; _ -> False liftIO $ noMessages a "nothing else should be delivered to alice" withSmpServerStoreLogOn t testPort $ \_ -> runRight_ $ do - nGet a =##> \case ("", "", UP _ [c]) -> c == bId; _ -> False - nGet b =##> \case ("", "", UP _ cs) -> length cs == 2; _ -> False + nGetUP a =##> \case ("", "", UP _ [c]) -> c == bId; _ -> False + nGetUP b =##> \case ("", "", UP _ cs) -> length cs == 2; _ -> False exchangeGreetingsMsgId 6 a bId b aId where aCfg = agentCfg {initialCleanupDelay = 10000, cleanupInterval = 10000, deleteErrorCount = 3} @@ -2770,7 +2746,7 @@ testTwoUsers = withAgentClients2 $ \a b -> do liftIO $ setNetworkConfig a nc {sessionMode = TSMEntity} liftIO $ threadDelay 250000 ("", "", DOWN _ _) <- nGet a - ("", "", UP _ _) <- nGet a + ("", "", UP _ _) <- nGetUP a a `hasClients` 2 exchangeGreetingsMsgId 6 a bId1 b aId1 @@ -2780,8 +2756,8 @@ testTwoUsers = withAgentClients2 $ \a b -> do liftIO $ threadDelay 250000 ("", "", DOWN _ _) <- nGet a ("", "", DOWN _ _) <- nGet a - ("", "", UP _ _) <- nGet a - ("", "", UP _ _) <- nGet a + ("", "", UP _ _) <- nGetUP a + ("", "", UP _ _) <- nGetUP a a `hasClients` 1 aUserId2 <- createUser a [noAuthSrv testSMPServer] [noAuthSrv testXFTPServer] @@ -2795,8 +2771,8 @@ testTwoUsers = withAgentClients2 $ \a b -> do liftIO $ threadDelay 250000 ("", "", DOWN _ _) <- nGet a ("", "", DOWN _ _) <- nGet a - ("", "", UP _ _) <- nGet a - ("", "", UP _ _) <- nGet a + ("", "", UP _ _) <- nGetUP a + ("", "", UP _ _) <- nGetUP a a `hasClients` 4 exchangeGreetingsMsgId 8 a bId1 b aId1 exchangeGreetingsMsgId 8 a bId1' b aId1' @@ -2809,10 +2785,10 @@ testTwoUsers = withAgentClients2 $ \a b -> do ("", "", DOWN _ _) <- nGet a ("", "", DOWN _ _) <- nGet a ("", "", DOWN _ _) <- nGet a - ("", "", UP _ _) <- nGet a - ("", "", UP _ _) <- nGet a - ("", "", UP _ _) <- nGet a - ("", "", UP _ _) <- nGet a + ("", "", UP _ _) <- nGetUP a + ("", "", UP _ _) <- nGetUP a + ("", "", UP _ _) <- nGetUP a + ("", "", UP _ _) <- nGetUP a a `hasClients` 2 exchangeGreetingsMsgId 10 a bId1 b aId1 exchangeGreetingsMsgId 10 a bId1' b aId1' diff --git a/tests/AgentTests/NotificationTests.hs b/tests/AgentTests/NotificationTests.hs index 01eab9555..ac97b92f4 100644 --- a/tests/AgentTests/NotificationTests.hs +++ b/tests/AgentTests/NotificationTests.hs @@ -19,6 +19,7 @@ import AgentTests.FunctionalAPITests joinConnection, makeConnection, nGet, + nGetUP, runRight, runRight_, sendMessage, @@ -605,8 +606,8 @@ testNotificationsSMPRestart t APNSMockServer {apnsQ} = withAgentClients2 $ \alic nGet bob =##> \case ("", "", DOWN _ [c]) -> c == aliceId; _ -> False withSmpServerStoreLogOn t testPort $ \threadId -> runRight_ $ do - nGet alice =##> \case ("", "", UP _ [c]) -> c == bobId; _ -> False - nGet bob =##> \case ("", "", UP _ [c]) -> c == aliceId; _ -> False + nGetUP alice =##> \case ("", "", UP _ [c]) -> c == bobId; _ -> False + nGetUP bob =##> \case ("", "", UP _ [c]) -> c == aliceId; _ -> False liftIO $ threadDelay 1000000 5 <- sendMessage bob aliceId (SMP.MsgFlags True) "hello again" get bob ##> ("", aliceId, SENT 5) @@ -639,11 +640,11 @@ testNotificationsSMPRestartBatch n t APNSMockServer {apnsQ} = liftIO $ length (acs1 <> acs2) `shouldBe` length conns runServers $ do - ("", "", UP _ bcs1) <- nGet a - ("", "", UP _ bcs2) <- nGet a + ("", "", UP _ bcs1) <- nGetUP a + ("", "", UP _ bcs2) <- nGetUP a liftIO $ length (bcs1 <> bcs2) `shouldBe` length conns - ("", "", UP _ acs1) <- nGet b - ("", "", UP _ acs2) <- nGet b + ("", "", UP _ acs1) <- nGetUP b + ("", "", UP _ acs2) <- nGetUP b liftIO $ length (acs1 <> acs2) `shouldBe` length conns liftIO $ threadDelay 1500000 forM_ conns $ \(aliceId, bobId) -> do diff --git a/tests/SMPProxyTests.hs b/tests/SMPProxyTests.hs index 036c8b203..ea09e4045 100644 --- a/tests/SMPProxyTests.hs +++ b/tests/SMPProxyTests.hs @@ -348,7 +348,7 @@ agentViaProxyRetryOffline = do withServer2 = withServer_ testStoreLogFile2 testStoreMsgsFile2 testPort2 withServer_ storeLog storeMsgs port = withSmpServerConfigOn (transport @TLS) proxyCfg {storeLogFile = Just storeLog, storeMsgsFile = Just storeMsgs} port - a `up` cId = nGet a =##> \case ("", "", UP _ [c]) -> c == cId; _ -> False + a `up` cId = nGetUP a =##> \case ("", "", UP _ [c]) -> c == cId; _ -> False a `down` cId = nGet a =##> \case ("", "", DOWN _ [c]) -> c == cId; _ -> False aCfg = agentProxyCfg {messageRetryInterval = fastMessageRetryInterval} baseId = 3