Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions cassandra-schema.cql
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,26 @@ CREATE TABLE galley_test.team_conv (
AND read_repair_chance = 0.0
AND speculative_retry = '99PERCENTILE';

CREATE TABLE galley_test.mls_commit_locks (
group_id blob,
epoch bigint,
PRIMARY KEY (group_id, epoch)
) WITH CLUSTERING ORDER BY (epoch ASC)
AND bloom_filter_fp_chance = 0.01
AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
AND comment = ''
AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND crc_check_chance = 1.0
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99PERCENTILE';

CREATE TABLE galley_test.team (
team uuid PRIMARY KEY,
binding boolean,
Expand Down
1 change: 1 addition & 0 deletions changelog.d/2-features/atomic-commits
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Prevent race conditions in concurrent MLS commit requests.
2 changes: 2 additions & 0 deletions services/galley/galley.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ library
Galley.Cassandra.Code
Galley.Cassandra.Conversation
Galley.Cassandra.Conversation.Members
Galley.Cassandra.Conversation.MLS
Galley.Cassandra.ConversationList
Galley.Cassandra.CustomBackend
Galley.Cassandra.Instances
Expand Down Expand Up @@ -625,6 +626,7 @@ executable galley-schema
V65_MLSRemoteClients
V66_AddSplashScreen
V67_MLSFeature
V68_MLSCommitLock
Paths_galley
hs-source-dirs:
schema/src
Expand Down
4 changes: 3 additions & 1 deletion services/galley/schema/src/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ import qualified V64_Epoch
import qualified V65_MLSRemoteClients
import qualified V66_AddSplashScreen
import qualified V67_MLSFeature
import qualified V68_MLSCommitLock

main :: IO ()
main = do
Expand Down Expand Up @@ -125,7 +126,8 @@ main = do
V64_Epoch.migration,
V65_MLSRemoteClients.migration,
V66_AddSplashScreen.migration,
V67_MLSFeature.migration
V67_MLSFeature.migration,
V68_MLSCommitLock.migration
-- When adding migrations here, don't forget to update
-- 'schemaVersion' in Galley.Cassandra
-- (see also docs/developer/cassandra-interaction.md)
Expand Down
33 changes: 33 additions & 0 deletions services/galley/schema/src/V68_MLSCommitLock.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
-- This file is part of the Wire Server implementation.
--
-- Copyright (C) 2022 Wire Swiss GmbH <opensource@wire.com>
--
-- This program is free software: you can redistribute it and/or modify it under
-- the terms of the GNU Affero General Public License as published by the Free
-- Software Foundation, either version 3 of the License, or (at your option) any
-- later version.
--
-- This program is distributed in the hope that it will be useful, but WITHOUT
-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
-- details.
--
-- You should have received a copy of the GNU Affero General Public License along
-- with this program. If not, see <https://www.gnu.org/licenses/>.

module V68_MLSCommitLock where

import Cassandra.Schema
import Imports
import Text.RawString.QQ

migration :: Migration
migration =
Migration 68 "Add lock table for MLS commits" $
schema'
[r| CREATE TABLE mls_commit_locks (
group_id blob,
epoch bigint,
PRIMARY KEY (group_id, epoch)
)
|]
88 changes: 61 additions & 27 deletions services/galley/src/Galley/API/MLS/Message.hs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import Galley.API.Util
import Galley.Data.Conversation.Types hiding (Conversation)
import qualified Galley.Data.Conversation.Types as Data
import Galley.Data.Services
import Galley.Data.Types
import Galley.Effects
import Galley.Effects.BrigAccess
import Galley.Effects.ConversationStore
Expand All @@ -49,6 +50,7 @@ import Polysemy
import Polysemy.Error
import Polysemy.Input
import Polysemy.Internal
import Polysemy.Resource (Resource, bracket)
import Polysemy.TinyLog
import qualified System.Logger.Class as Logger
import Wire.API.Conversation.Protocol
Expand All @@ -63,6 +65,7 @@ import Wire.API.Federation.Error
import Wire.API.MLS.CipherSuite
import Wire.API.MLS.Commit
import Wire.API.MLS.Credential
import Wire.API.MLS.Group
import Wire.API.MLS.KeyPackage
import Wire.API.MLS.Message
import Wire.API.MLS.Proposal
Expand All @@ -72,7 +75,8 @@ import Wire.API.Message
postMLSMessage ::
( HasProposalEffects r,
Members
'[ Error FederationError,
'[ Resource,
Error FederationError,
ErrorS 'ConvNotFound,
Error InternalError,
ErrorS 'MLSUnsupportedMessage,
Expand Down Expand Up @@ -154,7 +158,8 @@ processCommit ::
Member (ErrorS 'MLSProposalNotFound) r,
Member (Error FederationError) r,
Member (Error InternalError) r,
Member (ErrorS 'MissingLegalholdConsent) r
Member (ErrorS 'MissingLegalholdConsent) r,
Member Resource r
) =>
Local UserId ->
ConnId ->
Expand All @@ -167,34 +172,40 @@ processCommit lusr con conv epoch sender commit = do
self <- noteS @'ConvNotFound $ getConvMember lusr conv lusr

-- check epoch number
curEpoch <-
preview (to convProtocol . _ProtocolMLS . to cnvmlsEpoch) conv
convMeta <-
preview (to convProtocol . _ProtocolMLS) conv
& noteS @'ConvNotFound

let curEpoch = cnvmlsEpoch convMeta
groupId = cnvmlsGroupId convMeta

when (epoch /= curEpoch) $ throwS @'MLSStaleMessage

when (epoch == Epoch 0) $ do
-- this is a newly created conversation, and it should contain exactly one
-- client (the creator)
case (sender, toList (lmMLSClients self)) of
(MemberSender ref, [creatorClient]) -> do
-- register the creator client
addKeyPackageRef
ref
(qUntagged lusr)
creatorClient
(qUntagged (qualifyAs lusr (Data.convId conv)))
(MemberSender _, _) ->
throw (InternalErrorWithDescription "Unexpected creator client set")
_ -> throw (mlsProtocolError "Unexpected sender")

-- process and execute proposals
action <- foldMap applyProposalRef (cProposals commit)
events <- executeProposalAction lusr con conv action

-- increment epoch number
setConversationEpoch (Data.convId conv) (succ epoch)

pure events
let ttlSeconds :: Int = 600 -- 10 minutes
withCommitLock groupId epoch (fromIntegral ttlSeconds) $ do
when (epoch == Epoch 0) $ do
-- this is a newly created conversation, and it should contain exactly one
-- client (the creator)
case (sender, toList (lmMLSClients self)) of
(MemberSender ref, [creatorClient]) -> do
-- register the creator client
addKeyPackageRef
ref
(qUntagged lusr)
creatorClient
(qUntagged (qualifyAs lusr (Data.convId conv)))
(MemberSender _, _) ->
throw (InternalErrorWithDescription "Unexpected creator client set")
_ -> throw (mlsProtocolError "Unexpected sender")

-- process and execute proposals
action <- foldMap applyProposalRef (cProposals commit)
events <- executeProposalAction lusr con conv action

-- increment epoch number
setConversationEpoch (Data.convId conv) (succ epoch)

pure events

applyProposalRef ::
( HasProposalEffects r,
Expand Down Expand Up @@ -414,3 +425,26 @@ instance
HandleMLSProposalFailure (Error e) r
where
handleMLSProposalFailure = mapError (MLSProposalFailure . toWai)

withCommitLock ::
forall r a.
( Members
'[ Resource,
ConversationStore,
ErrorS 'MLSStaleMessage
]
r
) =>
GroupId ->
Epoch ->
NominalDiffTime ->
Sem r a ->
Sem r a
withCommitLock gid epoch ttl action =
bracket
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not acquire the lock on the opening side of the bracket?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

( acquireCommitLock gid epoch ttl >>= \lockAcquired ->
when (lockAcquired == NotAcquired) $
throwS @'MLSStaleMessage
)
(const $ releaseCommitLock gid epoch)
(const action)
3 changes: 3 additions & 0 deletions services/galley/src/Galley/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ import Polysemy
import Polysemy.Error
import Polysemy.Input
import Polysemy.Internal (Append)
import Polysemy.Resource (Resource, runResource)
import qualified Polysemy.TinyLog as P
import qualified Servant
import Ssl.Util
Expand All @@ -114,6 +115,7 @@ type GalleyEffects0 =
-- federation errors can be thrown by almost every endpoint, so we avoid
-- having to declare it every single time, and simply handle it here
Error FederationError,
Resource,
Embed IO,
Final IO
]
Expand Down Expand Up @@ -226,6 +228,7 @@ evalGalley :: Env -> Sem GalleyEffects a -> IO a
evalGalley e =
runFinal @IO
. embedToFinal @IO
. runResource
. interpretWaiErrorToException
. interpretWaiErrorToException
. interpretWaiErrorToException
Expand Down
2 changes: 1 addition & 1 deletion services/galley/src/Galley/Cassandra.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ module Galley.Cassandra (schemaVersion) where
import Imports

schemaVersion :: Int32
schemaVersion = 67
schemaVersion = 68
3 changes: 3 additions & 0 deletions services/galley/src/Galley/Cassandra/Conversation.hs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import Data.Range
import qualified Data.Set as Set
import Data.UUID.V4 (nextRandom)
import Galley.Cassandra.Access
import Galley.Cassandra.Conversation.MLS
import Galley.Cassandra.Conversation.Members
import qualified Galley.Cassandra.Queries as Cql
import Galley.Cassandra.Store
Expand Down Expand Up @@ -296,3 +297,5 @@ interpretConversationStoreToCassandra = interpret $ \case
SetConversationEpoch cid epoch -> embedClient $ updateConvEpoch cid epoch
DeleteConversation cid -> embedClient $ deleteConversation cid
SetGroupId gId cid -> embedClient $ mapGroupId gId cid
AcquireCommitLock gId epoch ttl -> embedClient $ acquireCommitLock gId epoch ttl
ReleaseCommitLock gId epoch -> embedClient $ releaseCommitLock gId epoch
56 changes: 56 additions & 0 deletions services/galley/src/Galley/Cassandra/Conversation/MLS.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
-- This file is part of the Wire Server implementation.
--
-- Copyright (C) 2022 Wire Swiss GmbH <opensource@wire.com>
--
-- This program is free software: you can redistribute it and/or modify it under
-- the terms of the GNU Affero General Public License as published by the Free
-- Software Foundation, either version 3 of the License, or (at your option) any
-- later version.
--
-- This program is distributed in the hope that it will be useful, but WITHOUT
-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
-- details.
--
-- You should have received a copy of the GNU Affero General Public License along
-- with this program. If not, see <https://www.gnu.org/licenses/>.

module Galley.Cassandra.Conversation.MLS where

import Cassandra
import Cassandra.Settings (fromRow)
import Data.Time
import qualified Galley.Cassandra.Queries as Cql
import Galley.Data.Types
import Imports
import Wire.API.MLS.Group
import Wire.API.MLS.Message

acquireCommitLock :: GroupId -> Epoch -> NominalDiffTime -> Client LockAcquired
acquireCommitLock groupId epoch ttl = do
rows <-
retry x5 $
trans
Cql.acquireCommitLock
( params
LocalQuorum
(groupId, epoch, round ttl)
)
pure $
if checkTransSuccess rows
then Acquired
else NotAcquired

releaseCommitLock :: GroupId -> Epoch -> Client ()
releaseCommitLock groupId epoch =
retry x5 $
write
Cql.releaseCommitLock
( params
LocalQuorum
(groupId, epoch)
)

checkTransSuccess :: [Row] -> Bool
checkTransSuccess [] = False
checkTransSuccess (row : _) = either (const False) (fromMaybe False) $ fromRow 0 row
6 changes: 6 additions & 0 deletions services/galley/src/Galley/Cassandra/Queries.hs
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,12 @@ addLocalMLSClients = "update member set mls_clients = mls_clients + ? where conv
addRemoteMLSClients :: PrepQuery W (C.Set ClientId, ConvId, Domain, UserId) ()
addRemoteMLSClients = "update member_remote_user set mls_clients = mls_clients + ? where conv = ? and user_remote_domain = ? and user_remote_id = ?"

acquireCommitLock :: PrepQuery W (GroupId, Epoch, Int32) Row
acquireCommitLock = "insert into mls_commit_locks (group_id, epoch) values (?, ?) if not exists using ttl ?"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm surprised that this works. I was under the impression that you can't have TTL values as "question mark" parameters.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We tried it locally by temporarily removing the finishing part of the bracket and observed the entries appearing and finally disappearing after the TTL expired. Would this manual test count or should I look deeper into it?


releaseCommitLock :: PrepQuery W (GroupId, Epoch) ()
releaseCommitLock = "delete from mls_commit_locks where group_id = ? and epoch = ?"

-- Services -----------------------------------------------------------------

rmSrv :: PrepQuery W (ProviderId, ServiceId) ()
Expand Down
6 changes: 6 additions & 0 deletions services/galley/src/Galley/Data/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ module Galley.Data.Types
toCode,
generate,
mkKey,
LockAcquired (..),
)
where

Expand Down Expand Up @@ -87,3 +88,8 @@ mkKey :: MonadIO m => ConvId -> m Key
mkKey cnv = do
sha256 <- liftIO $ fromJust <$> getDigestByName "SHA256"
pure $ Key . unsafeRange . Ascii.encodeBase64Url . BS.take 15 $ digestBS sha256 (toByteString' cnv)

data LockAcquired
= Acquired
| NotAcquired
deriving (Show, Eq)
8 changes: 8 additions & 0 deletions services/galley/src/Galley/Effects/ConversationStore.hs
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,20 @@ module Galley.Effects.ConversationStore

-- * Delete conversation
deleteConversation,

-- * MLS commit lock management
acquireCommitLock,
releaseCommitLock,
)
where

import Data.Id
import Data.Misc
import Data.Qualified
import Data.Range
import Data.Time (NominalDiffTime)
import Galley.Data.Conversation
import Galley.Data.Types
import Galley.Types.Conversations.Members
import Imports
import Polysemy
Expand Down Expand Up @@ -81,6 +87,8 @@ data ConversationStore m a where
SetConversationMessageTimer :: ConvId -> Maybe Milliseconds -> ConversationStore m ()
SetConversationEpoch :: ConvId -> Epoch -> ConversationStore m ()
SetGroupId :: GroupId -> Qualified ConvId -> ConversationStore m ()
AcquireCommitLock :: GroupId -> Epoch -> NominalDiffTime -> ConversationStore m LockAcquired
ReleaseCommitLock :: GroupId -> Epoch -> ConversationStore m ()

makeSem ''ConversationStore

Expand Down
Loading