@@ -395,19 +395,36 @@ createClientService db userId srv (kh, (cert, pk)) =
395395 |]
396396 (userId, host srv, port srv, kh, cert, pk)
397397
398- getClientService :: DB. Connection -> UserId -> SMPServer -> IO (Maybe ((C. KeyHash , TLS. Credential ), Maybe ServiceId ))
398+ getClientService :: DB. Connection -> UserId -> SMPServer -> IO (Maybe ((C. KeyHash , TLS. Credential ), Maybe ServiceSub ))
399399getClientService db userId srv =
400400 maybeFirstRow toService $
401401 DB. query
402402 db
403403 [sql |
404- SELECT service_cert_hash, service_cert, service_priv_key, rcv_service_id
404+ SELECT service_cert_hash, service_cert, service_priv_key, rcv_service_id,
405405 FROM client_services
406406 WHERE user_id = ? AND host = ? AND port = ?
407407 |]
408408 (userId, host srv, port srv)
409409 where
410- toService (kh, cert, pk, serviceId_) = ((kh, (cert, pk)), serviceId_)
410+ toService (kh, cert, pk, serviceId_, n, idsHash) =
411+ let service_ = (\ serviceId -> ServiceSub serviceId n idsHash) <$> serviceId_
412+ in ((kh, (cert, pk)), service_)
413+
414+ updateServiceAggregates :: DB. Connection -> UserId -> SMPServer -> Int64 -> [RecipientId ] -> IO ()
415+ updateServiceAggregates db userId server change rcvIds =
416+ getClientService db userId server >>= \ case
417+ Just (_, Just (ServiceSub serviceId n idsHash)) ->
418+ DB. execute
419+ db
420+ [sql |
421+ UPDATE client_services
422+ SET service_queue_count = ?,
423+ service_queue_ids_hash = ?
424+ WHERE user_id = ? AND rcv_service_id = ?
425+ |]
426+ (n + change, idsHash <> queueIdsHash rcvIds, userId, serviceId)
427+ _ -> pure ()
411428
412429getClientServiceServers :: DB. Connection -> UserId -> IO [(SMPServer , ServiceSub )]
413430getClientServiceServers db userId =
@@ -499,8 +516,13 @@ createConnRecord db connId ConnData {userId, connAgentVersion, enableNtfs, pqSup
499516 |]
500517 (userId, connId, cMode, connAgentVersion, BI enableNtfs, pqSupport, BI True )
501518
502- deleteConnRecord :: DB. Connection -> ConnId -> IO ()
503- deleteConnRecord db connId = DB. execute db " DELETE FROM connections WHERE conn_id = ?" (Only connId)
519+ deleteConnRecord :: DB. Connection -> UserId -> ConnId -> IO ()
520+ deleteConnRecord db userId connId = do
521+ -- TODO [certs rcv] it needs to be grouped per server here
522+ rIds :: [RecipientId ] <-
523+ map fromOnly <$> DB. query db " SELECT rcv_id FROM rcv_queues WHERE conn_id = ? AND deleted = 0 AND rcv_service_assoc = 1" (Only connId)
524+ unless (null rIds) $ updateServiceAggregates db userId server (- length rIds') rIds'
525+ DB. execute db " DELETE FROM connections WHERE conn_id = ?" (Only connId)
504526
505527checkConfirmedSndQueueExists_ :: DB. Connection -> NewSndQueue -> IO Bool
506528checkConfirmedSndQueueExists_ db SndQueue {server, sndId} = do
@@ -521,8 +543,8 @@ getRcvConn db ProtocolServer {host, port} rcvId = runExceptT $ do
521543 (rq,) <$> ExceptT (getConn db connId)
522544
523545-- | Deletes connection, optionally checking for pending snd message deliveries; returns connection id if it was deleted
524- deleteConn :: DB. Connection -> Maybe NominalDiffTime -> ConnId -> IO (Maybe ConnId )
525- deleteConn db waitDeliveryTimeout_ connId = case waitDeliveryTimeout_ of
546+ deleteConn :: DB. Connection -> Maybe NominalDiffTime -> UserId -> ConnId -> IO (Maybe ConnId )
547+ deleteConn db waitDeliveryTimeout_ userId connId = case waitDeliveryTimeout_ of
526548 Nothing -> delete
527549 Just timeout ->
528550 ifM
@@ -534,7 +556,7 @@ deleteConn db waitDeliveryTimeout_ connId = case waitDeliveryTimeout_ of
534556 (pure Nothing )
535557 )
536558 where
537- delete = deleteConnRecord db connId $> Just connId
559+ delete = deleteConnRecord db userId connId $> Just connId
538560 checkNoPendingDeliveries_ = do
539561 r :: (Maybe Int64 ) <-
540562 maybeFirstRow fromOnly $
@@ -609,15 +631,13 @@ setRcvSwitchStatus db rq@RcvQueue {rcvId, server = ProtocolServer {host, port}}
609631 pure rq {rcvSwchStatus}
610632
611633setRcvQueueDeleted :: DB. Connection -> RcvQueue -> IO ()
612- setRcvQueueDeleted db RcvQueue {rcvId, server = ProtocolServer {host, port}} = do
613- DB. execute
614- db
615- [sql |
616- UPDATE rcv_queues
617- SET deleted = 1
618- WHERE host = ? AND port = ? AND rcv_id = ?
619- |]
620- (host, port, rcvId)
634+ setRcvQueueDeleted db RcvQueue {connId, dbQueueId, rcvId} = do
635+ q_ :: Maybe BoolInt <-
636+ maybeFirstRow fromOnly $
637+ DB. query db " SELECT rcv_service_assoc FROM rcv_queues WHERE conn_id = ? AND rcv_queue_id = ? AND deleted = 0" (connId, dbQueueId)
638+ forM_ q_ $ \ (BI rcvServiceAssoc) -> do
639+ DB. execute db " UPDATE rcv_queues SET deleted = 1 WHERE host = ? AND port = ? AND rcv_id = ?" (host, port, rcvId)
640+ when rcvServiceAssoc $ updateServiceAggregates db userId server (- 1 ) [rcvId]
621641
622642setRcvQueueConfirmedE2E :: DB. Connection -> RcvQueue -> C. DhSecretX25519 -> VersionSMPC -> IO ()
623643setRcvQueueConfirmedE2E db RcvQueue {rcvId, server = ProtocolServer {host, port}} e2eDhSecret smpClientVersion =
@@ -677,8 +697,13 @@ incRcvDeleteErrors db RcvQueue {connId, dbQueueId} =
677697 DB. execute db " UPDATE rcv_queues SET delete_errors = delete_errors + 1 WHERE conn_id = ? AND rcv_queue_id = ?" (connId, dbQueueId)
678698
679699deleteConnRcvQueue :: DB. Connection -> RcvQueue -> IO ()
680- deleteConnRcvQueue db RcvQueue {connId, dbQueueId} =
681- DB. execute db " DELETE FROM rcv_queues WHERE conn_id = ? AND rcv_queue_id = ?" (connId, dbQueueId)
700+ deleteConnRcvQueue db RcvQueue {connId, dbQueueId, server} = do
701+ q_ :: Maybe (BoolInt , BoolInt ) <-
702+ maybeFirstRow id $
703+ DB. query db " SELECT deleted, rcv_service_assoc FROM rcv_queues WHERE conn_id = ? AND rcv_queue_id = ?" (connId, dbQueueId)
704+ forM_ q_ $ \ (BI deleted, BI rcvServiceAssoc) -> do
705+ DB. execute db " DELETE FROM rcv_queues WHERE conn_id = ? AND rcv_queue_id = ?" (connId, dbQueueId)
706+ when (not deleted && rcvServiceAssoc) $ updateServiceAggregates db userId server (- 1 ) [rcvId]
682707
683708deleteConnSndQueue :: DB. Connection -> ConnId -> SndQueue -> IO ()
684709deleteConnSndQueue db connId SndQueue {dbQueueId} = do
@@ -687,7 +712,7 @@ deleteConnSndQueue db connId SndQueue {dbQueueId} = do
687712
688713getPrimaryRcvQueue :: DB. Connection -> ConnId -> IO (Either StoreError RcvQueue )
689714getPrimaryRcvQueue db connId =
690- maybe (Left SEConnNotFound ) (Right . L. head ) <$> getRcvQueuesByConnId_ db connId
715+ maybe (Left SEConnNotFound ) (Right . L. head ) <$> getRcvQueuesByConnId_ db connId False
691716
692717getRcvQueue :: DB. Connection -> ConnId -> SMPServer -> SMP. RecipientId -> IO (Either StoreError RcvQueue )
693718getRcvQueue db connId (SMPServer host port _) rcvId =
@@ -1015,7 +1040,7 @@ getPendingQueueMsg db connId SndQueue {dbQueueId} =
10151040 getMsgData :: InternalId -> IO (Either StoreError (Maybe RcvQueue , PendingMsgData ))
10161041 getMsgData msgId = runExceptT $ do
10171042 msg <- ExceptT $ firstRow' pendingMsgData err getMsgData_
1018- rq_ <- liftIO $ L. head <$$> getRcvQueuesByConnId_ db connId
1043+ rq_ <- liftIO $ L. head <$$> getRcvQueuesByConnId_ db connId False
10191044 pure (rq_, msg)
10201045 where
10211046 getMsgData_ =
@@ -2061,6 +2086,7 @@ insertRcvQueue_ db connId' rq@RcvQueue {..} serverKeyHash_ = do
20612086 :. (shortLinkId <$> shortLink, shortLinkKey <$> shortLink, linkPrivSigKey <$> shortLink, linkEncFixedData <$> shortLink)
20622087 :. ntfCredsFields
20632088 )
2089+ when rcvServiceAssoc $ updateServiceAggregates db userId server 1 [rcvId]
20642090 pure (rq :: NewRcvQueue ) {connId = connId', dbQueueId = qId}
20652091 where
20662092 ntfCredsFields = case clientNtfCreds of
@@ -2119,21 +2145,19 @@ getDeletedConn = getAnyConn True
21192145{-# INLINE getDeletedConn #-}
21202146
21212147getAnyConn :: Bool -> DB. Connection -> ConnId -> IO (Either StoreError SomeConn )
2122- getAnyConn deleted' dbConn connId =
2123- getConnData dbConn connId >>= \ case
2148+ getAnyConn deleted dbConn connId =
2149+ getConnData deleted dbConn connId >>= \ case
21242150 Nothing -> pure $ Left SEConnNotFound
2125- Just (cData@ ConnData {deleted}, cMode)
2126- | deleted /= deleted' -> pure $ Left SEConnNotFound
2127- | otherwise -> do
2128- rQ <- getRcvQueuesByConnId_ dbConn connId
2129- sQ <- getSndQueuesByConnId_ dbConn connId
2130- pure $ case (rQ, sQ, cMode) of
2131- (Just rqs, Just sqs, CMInvitation ) -> Right $ SomeConn SCDuplex (DuplexConnection cData rqs sqs)
2132- (Just (rq :| _), Nothing , CMInvitation ) -> Right $ SomeConn SCRcv (RcvConnection cData rq)
2133- (Nothing , Just (sq :| _), CMInvitation ) -> Right $ SomeConn SCSnd (SndConnection cData sq)
2134- (Just (rq :| _), Nothing , CMContact ) -> Right $ SomeConn SCContact (ContactConnection cData rq)
2135- (Nothing , Nothing , _) -> Right $ SomeConn SCNew (NewConnection cData)
2136- _ -> Left SEConnNotFound
2151+ Just (cData, cMode) -> do
2152+ rQ <- getRcvQueuesByConnId_ dbConn connId deleted
2153+ sQ <- getSndQueuesByConnId_ dbConn connId
2154+ pure $ case (rQ, sQ, cMode) of
2155+ (Just rqs, Just sqs, CMInvitation ) -> Right $ SomeConn SCDuplex (DuplexConnection cData rqs sqs)
2156+ (Just (rq :| _), Nothing , CMInvitation ) -> Right $ SomeConn SCRcv (RcvConnection cData rq)
2157+ (Nothing , Just (sq :| _), CMInvitation ) -> Right $ SomeConn SCSnd (SndConnection cData sq)
2158+ (Just (rq :| _), Nothing , CMContact ) -> Right $ SomeConn SCContact (ContactConnection cData rq)
2159+ (Nothing , Nothing , _) -> Right $ SomeConn SCNew (NewConnection cData)
2160+ _ -> Left SEConnNotFound
21372161
21382162getConns :: DB. Connection -> [ConnId ] -> IO [Either StoreError SomeConn ]
21392163getConns = getAnyConns_ False
@@ -2149,8 +2173,8 @@ getAnyConns_ deleted' db connIds = forM connIds $ E.handle handleDBError . getAn
21492173 handleDBError :: E. SomeException -> IO (Either StoreError SomeConn )
21502174 handleDBError = pure . Left . SEInternal . bshow
21512175
2152- getConnData :: DB. Connection -> ConnId -> IO (Maybe (ConnData , ConnectionMode ))
2153- getConnData db connId' =
2176+ getConnData :: Bool -> DB. Connection -> ConnId -> IO (Maybe (ConnData , ConnectionMode ))
2177+ getConnData deleted db connId' =
21542178 maybeFirstRow cData $
21552179 DB. query
21562180 db
@@ -2159,9 +2183,9 @@ getConnData db connId' =
21592183 user_id, conn_id, conn_mode, smp_agent_version, enable_ntfs,
21602184 last_external_snd_msg_id, deleted, ratchet_sync_state, pq_support
21612185 FROM connections
2162- WHERE conn_id = ?
2186+ WHERE conn_id = ? AND deleted = ?
21632187 |]
2164- (Only connId')
2188+ (connId', BI deleted )
21652189 where
21662190 cData (userId, connId, cMode, connAgentVersion, enableNtfs_, lastExternalSndId, BI deleted, ratchetSyncState, pqSupport) =
21672191 (ConnData {userId, connId, connAgentVersion, enableNtfs = maybe True unBI enableNtfs_, lastExternalSndId, deleted, ratchetSyncState, pqSupport}, cMode)
@@ -2171,8 +2195,21 @@ setConnDeleted db waitDelivery connId
21712195 | waitDelivery = do
21722196 currentTs <- getCurrentTime
21732197 DB. execute db " UPDATE connections SET deleted_at_wait_delivery = ? WHERE conn_id = ?" (currentTs, connId)
2174- | otherwise =
2198+ | otherwise = do
21752199 DB. execute db " UPDATE connections SET deleted = ? WHERE conn_id = ?" (BI True , connId)
2200+ -- TODO [certs rcv] it needs to be grouped per server
2201+ qs :: [(Int64 , RecipientId , BoolInt )] <-
2202+ DB. query db " SELECT rcv_queue_id, rcv_id, rcv_service_assoc FROM rcv_queues WHERE conn_id = ? AND deleted = 0" (Only connId)
2203+ unless (null qs) $ do
2204+ #if defined(dbPostgres)
2205+ let dbQIds = In (map (\ (dbQId, _, _) -> dbQId) qs)
2206+ DB. execute db " UPDATE rcv_queues SET deleted = 1 WHERE conn_id = ? AND rcv_queue_id IN ?" (connId, dbQIds)
2207+ #else
2208+ let ids = map (\ (dbQId, _, _) -> (connId, dbQId)) qs
2209+ DB. executeMany db " UPDATE rcv_queues SET deleted = 1 WHERE conn_id = ? AND rcv_queue_id = ?" ids
2210+ #endif
2211+ let rIds' = mapMaybe (\ case (_, rId, BI True ) -> Just rId; _ -> Nothing ) qs
2212+ unless (null rIds') $ updateServiceAggregates db userId server (- length rIds') rIds'
21762213
21772214setConnUserId :: DB. Connection -> UserId -> ConnId -> UserId -> IO ()
21782215setConnUserId db oldUserId connId newUserId =
@@ -2218,10 +2255,10 @@ deleteRatchetKeyHashesExpired db ttl = do
22182255 DB. execute db " DELETE FROM processed_ratchet_key_hashes WHERE created_at < ?" (Only cutoffTs)
22192256
22202257-- | returns all connection queues, the first queue is the primary one
2221- getRcvQueuesByConnId_ :: DB. Connection -> ConnId -> IO (Maybe (NonEmpty RcvQueue ))
2222- getRcvQueuesByConnId_ db connId =
2258+ getRcvQueuesByConnId_ :: DB. Connection -> ConnId -> Bool -> IO (Maybe (NonEmpty RcvQueue ))
2259+ getRcvQueuesByConnId_ db connId deleted =
22232260 L. nonEmpty . sortBy primaryFirst . map toRcvQueue
2224- <$> DB. query db (rcvQueueQuery <> " WHERE q.conn_id = ? AND q.deleted = 0 " ) (Only connId)
2261+ <$> DB. query db (rcvQueueQuery <> " WHERE q.conn_id = ? AND q.deleted = ? " ) (connId, deleted )
22252262 where
22262263 primaryFirst RcvQueue {primary = p, dbReplaceQueueId = i} RcvQueue {primary = p', dbReplaceQueueId = i'} =
22272264 -- the current primary queue is ordered first, the next primary - second
0 commit comments