Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Simplex/Messaging/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ setUserNetworkInfo c@AgentClient {userNetworkInfo, userNetworkUpdated} ni = with

reconnectAllServers :: AgentClient -> IO ()
reconnectAllServers c = do
reconnectServerClients c smpClients
withAgentEnv' c $ reconnectSMPServerClients c
reconnectServerClients c xftpClients
reconnectServerClients c ntfClients

Expand Down
37 changes: 36 additions & 1 deletion src/Simplex/Messaging/Agent/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ module Simplex.Messaging.Agent.Client
closeAgentClient,
closeProtocolServerClients,
reconnectServerClients,
reconnectSMPServerClients,
reconnectSMPServer,
closeXFTPServerClient,
runSMPServerTest,
Expand Down Expand Up @@ -211,7 +212,7 @@ import Simplex.Messaging.Agent.Stats
import Simplex.Messaging.Agent.Store
import Simplex.Messaging.Agent.Store.SQLite (SQLiteStore (..), withTransaction)
import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB
import Simplex.Messaging.Agent.TRcvQueues (TRcvQueues (getRcvQueues))
import Simplex.Messaging.Agent.TRcvQueues (TRcvQueues (getRcvQueues), activeToPendingQueues)
import qualified Simplex.Messaging.Agent.TRcvQueues as RQ
import Simplex.Messaging.Client
import qualified Simplex.Messaging.Crypto as C
Expand Down Expand Up @@ -241,6 +242,7 @@ import Simplex.Messaging.Protocol
QueueIdsKeys (..),
RcvMessage (..),
RcvNtfPublicDhKey,
RecipientId,
SMPMsgMeta (..),
SProtocolType (..),
SenderCanSecure,
Expand Down Expand Up @@ -922,6 +924,39 @@ reconnectServerClients :: ProtocolServerClient v err msg => AgentClient -> (Agen
reconnectServerClients c clientsSel =
readTVarIO (clientsSel c) >>= mapM_ (forkIO . closeClient_ c)

reconnectSMPServerClients :: AgentClient -> AM' ()
reconnectSMPServerClients c = do
-- 1. swap smpClients to empty map, move active subscriptions to pending
(clients, prevActive) <- atomically $ do
clients <- smpClients c `swapTVar` M.empty
prevActive <- activeToPendingQueues (activeSubs c) (pendingSubs c)
pure (clients, prevActive)
-- 2. notify DOWN for connections that had active subscriptions
let downConns = groupConnsByServer prevActive
forM_ (M.toList downConns) $ \(server, connIds) ->
liftIO $ notifyDOWN server connIds
-- 3. close clients
mapM_ (liftIO . forkIO . closeClient_ c) clients
-- 4. resubscribe pending subscriptions
mode <- liftIO $ getSessionMode c
pending <- readTVarIO (getRcvQueues $ pendingSubs c)
-- Group transport sessions to avoid multiple UP events in case session mode is TSMUser
let tSessions = queuesToSessions pending mode
forM_ tSessions $ \tSess -> resubscribeSMPSession c tSess
where
groupConnsByServer :: Map (UserId, SMPServer, RecipientId) RcvQueue -> Map SMPServer [ConnId]
groupConnsByServer = foldl' insertConnId M.empty
where
insertConnId :: Map SMPServer [ConnId] -> RcvQueue -> Map SMPServer [ConnId]
insertConnId acc RcvQueue {server, connId} =
M.insertWith (<>) server [connId] acc
notifyDOWN :: SMPServer -> [ConnId] -> IO ()
notifyDOWN server connIds = atomically $ writeTBQueue (subQ c) ("", "", AEvt SAENone (DOWN server connIds))
queuesToSessions :: Map (UserId, SMPServer, RecipientId) RcvQueue -> TransportSessionMode -> Set SMPTransportSession
queuesToSessions qs mode = case mode of
TSMEntity -> M.foldrWithKey (\(userId, srv, rId) _ acc -> S.insert (userId, srv, Just rId) acc) S.empty qs
TSMUser -> M.foldrWithKey (\(userId, srv, _) _ acc -> S.insert (userId, srv, Nothing) acc) S.empty qs

reconnectSMPServer :: AgentClient -> UserId -> SMPServer -> IO ()
reconnectSMPServer c userId srv = do
cs <- readTVarIO $ smpClients c
Expand Down
19 changes: 19 additions & 0 deletions src/Simplex/Messaging/Agent/TRcvQueues.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ module Simplex.Messaging.Agent.TRcvQueues
deleteQueue,
getSessQueues,
getDelSessQueues,
activeToPendingQueues,
qKey,
)
where
Expand All @@ -19,6 +20,7 @@ import Control.Concurrent.STM
import Data.Foldable (foldl')
import Data.List.NonEmpty (NonEmpty (..), (<|))
import qualified Data.List.NonEmpty as L
import Data.Map (Map)
import qualified Data.Map.Strict as M
import Simplex.Messaging.Agent.Protocol (ConnId, UserId)
import Simplex.Messaging.Agent.Store (RcvQueue, StoredRcvQueue (..))
Expand Down Expand Up @@ -96,6 +98,23 @@ getDelSessQueues tSess (TRcvQueues qs cs) = do
Nothing -> (cId : removed, Nothing)
Nothing -> (removed, Nothing) -- "impossible" in invariant holds, because we get keys from the known queues

-- moves active queues to pending queues and returns queues that were active
activeToPendingQueues :: TRcvQueues -> TRcvQueues -> STM (Map (UserId, SMPServer, RecipientId) RcvQueue)
activeToPendingQueues (TRcvQueues aqs acs) (TRcvQueues pqs pcs) = do
aqs' <- mergeQueues
mergeConns
pure aqs'
where
mergeQueues :: STM (Map (UserId, SMPServer, RecipientId) RcvQueue)
mergeQueues = do
aqs' <- aqs `swapTVar` M.empty
modifyTVar pqs $ \pqs' -> M.union aqs' pqs'
pure aqs'
mergeConns :: STM ()
mergeConns = do
acs' <- acs `swapTVar` M.empty
modifyTVar pcs $ \pcs' -> M.unionWith (<>) acs' pcs'

isSession :: RcvQueue -> (UserId, SMPServer, Maybe ConnId) -> Bool
isSession rq (uId, srv, connId_) =
userId rq == uId && server rq == srv && maybe True (connId rq ==) connId_
Expand Down
11 changes: 3 additions & 8 deletions tests/AgentTests/FunctionalAPITests.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2828,6 +2828,7 @@ testTwoUsers = withAgentClients2 $ \a b -> do
liftIO $ threadDelay 250000
("", "", DOWN _ _) <- nGet a
("", "", UP _ _) <- nGet a
("", "", UP _ _) <- nGet a
a `hasClients` 2

exchangeGreetingsMsgId 4 a bId1 b aId1
Expand All @@ -2836,8 +2837,6 @@ testTwoUsers = withAgentClients2 $ \a b -> do
liftIO $ setNetworkConfig a nc {sessionMode = TSMUser}
liftIO $ threadDelay 250000
("", "", DOWN _ _) <- nGet a
("", "", DOWN _ _) <- nGet a
("", "", UP _ _) <- nGet a
("", "", UP _ _) <- nGet a
a `hasClients` 1

Expand All @@ -2851,7 +2850,8 @@ testTwoUsers = withAgentClients2 $ \a b -> do
liftIO $ setNetworkConfig a nc {sessionMode = TSMEntity}
liftIO $ threadDelay 250000
("", "", DOWN _ _) <- nGet a
("", "", DOWN _ _) <- nGet a
("", "", UP _ _) <- nGet a
("", "", UP _ _) <- nGet a
("", "", UP _ _) <- nGet a
("", "", UP _ _) <- nGet a
a `hasClients` 4
Expand All @@ -2863,11 +2863,6 @@ testTwoUsers = withAgentClients2 $ \a b -> do
liftIO $ setNetworkConfig a nc {sessionMode = TSMUser}
liftIO $ threadDelay 250000
("", "", DOWN _ _) <- nGet a
("", "", DOWN _ _) <- nGet a
("", "", DOWN _ _) <- nGet a
("", "", DOWN _ _) <- nGet a
("", "", UP _ _) <- nGet a
("", "", UP _ _) <- nGet a
("", "", UP _ _) <- nGet a
("", "", UP _ _) <- nGet a
a `hasClients` 2
Expand Down
30 changes: 29 additions & 1 deletion tests/CoreTests/TRcvQueuesTests.hs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ tRcvQueuesTests = do
it "getDelSessQueues" getDelSessQueuesTest
describe "queue transfer" $ do
it "getDelSessQueues-batchAddQueues preserves total length" removeSubsTest
it "activeToPendingQueues" activeToPendingTest

checkDataInvariant :: RQ.TRcvQueues -> IO Bool
checkDataInvariant trq = atomically $ do
Expand Down Expand Up @@ -76,7 +77,7 @@ batchIdempotentTest = do
atomically $ RQ.batchAddQueues trq qs
checkDataInvariant trq `shouldReturn` True
readTVarIO (RQ.getRcvQueues trq) `shouldReturn` qs'
fmap L.nub <$> readTVarIO (RQ.getConnections trq) `shouldReturn`cs' -- connections get duplicated, but that doesn't appear to affect anybody
fmap L.nub <$> readTVarIO (RQ.getConnections trq) `shouldReturn` cs' -- connections get duplicated, but that doesn't appear to affect anybody

deleteConnTest :: IO ()
deleteConnTest = do
Expand Down Expand Up @@ -163,6 +164,33 @@ removeSubsTest = do
atomically $ RQ.getDelSessQueues (0, "smp://1234-w==@beta", Just "c3") aq >>= RQ.batchAddQueues pq . fst
atomically (totalSize aq pq) `shouldReturn` (4, 4)

activeToPendingTest :: IO ()
activeToPendingTest = do
aq <- atomically RQ.empty
let qs1 =
[ dummyRQ 0 "smp://1234-w==@alpha" "c1",
dummyRQ 0 "smp://1234-w==@alpha" "c2"
]
atomically $ RQ.batchAddQueues aq qs1

pq <- atomically RQ.empty
let qs2 =
[ dummyRQ 0 "smp://1234-w==@beta" "c3",
dummyRQ 1 "smp://1234-w==@beta" "c4"
]
atomically $ RQ.batchAddQueues pq qs2

atomically (totalSize aq pq) `shouldReturn` (4, 4)

prevActive <- atomically $ RQ.activeToPendingQueues aq pq
atomically (totalSize aq pq) `shouldReturn` (4, 4)
M.keys <$> readTVarIO (RQ.getConnections aq) `shouldReturn` []
M.keys <$> readTVarIO (RQ.getConnections pq) `shouldReturn` ["c1", "c2", "c3", "c4"]
-- M.keys prevActive `shouldMatchList` [(0, "smp://1234-w==@alpha", ""), (0, "smp://1234-w==@alpha", "")]
Copy link
Collaborator Author

@spaced4ndy spaced4ndy Jul 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see qKey rq = (userId rq, server rq, connId rq) in TRcvQueues.hs - why does it use connId and not rcvId? couldn't this cause some bug in case of redundant queues?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting

M.keys prevActive `shouldMatchList` [(0, "smp://1234-w==@alpha", "c1"), (0, "smp://1234-w==@alpha", "c2")]
checkDataInvariant aq `shouldReturn` True
checkDataInvariant pq `shouldReturn` True

totalSize :: RQ.TRcvQueues -> RQ.TRcvQueues -> STM (Int, Int)
totalSize a b = do
qsizeA <- M.size <$> readTVar (RQ.getRcvQueues a)
Expand Down