From 5c4434963468d5907a3cb85b826c032ef746c418 Mon Sep 17 00:00:00 2001 From: jschaul Date: Tue, 26 Oct 2021 16:19:33 +0200 Subject: [PATCH 1/5] cassandra-util: initial version of a dcAwarePolicy for talking to cassandra --- libs/cassandra-util/cassandra-util.cabal | 3 ++- libs/cassandra-util/package.yaml | 1 + libs/cassandra-util/src/Cassandra/Settings.hs | 19 ++++++++++++++++++- 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/libs/cassandra-util/cassandra-util.cabal b/libs/cassandra-util/cassandra-util.cabal index a26223c7aa..d68adc8251 100644 --- a/libs/cassandra-util/cassandra-util.cabal +++ b/libs/cassandra-util/cassandra-util.cabal @@ -4,7 +4,7 @@ cabal-version: 1.12 -- -- see: https://github.com/sol/hpack -- --- hash: 9a030e92940be80f5ff4f31e38dbbddc2d24567f1114953edf9924cf61f9c43f +-- hash: 0e7f101562d82c7e04fbc1824f5bc9ef427915eacf3bd7370a2412a016a022be name: cassandra-util version: 0.16.5 @@ -35,6 +35,7 @@ library aeson >=0.7 , base >=4.6 && <5.0 , conduit + , containers , cql >=3.0.0 , cql-io >=0.14 , cql-io-tinylog diff --git a/libs/cassandra-util/package.yaml b/libs/cassandra-util/package.yaml index ca9dc4318b..cb0241dc84 100644 --- a/libs/cassandra-util/package.yaml +++ b/libs/cassandra-util/package.yaml @@ -12,6 +12,7 @@ dependencies: - aeson >=0.7 - base >=4.6 && <5.0 - conduit +- containers - cql >=3.0.0 - cql-io >=0.14 - cql-io-tinylog diff --git a/libs/cassandra-util/src/Cassandra/Settings.hs b/libs/cassandra-util/src/Cassandra/Settings.hs index f11c0b7a6c..e03538dc6c 100644 --- a/libs/cassandra-util/src/Cassandra/Settings.hs +++ b/libs/cassandra-util/src/Cassandra/Settings.hs @@ -23,6 +23,8 @@ module Cassandra.Settings ( module C, initialContactsDisco, initialContactsPlain, + dcAwareRandomPolicy, + dcAwareSettings, ) where @@ -30,7 +32,7 @@ import Control.Lens import Data.Aeson.Lens import Data.List.NonEmpty (NonEmpty (..)) import Data.Text (pack, stripSuffix, unpack) -import Database.CQL.IO as C (Policy, Settings, addContact, defSettings, setCompression, setConnectTimeout, setContacts, setIdleTimeout, setKeyspace, setLogger, setMaxConnections, setMaxStreams, setMaxTimeouts, setPolicy, setPoolStripes, setPortNumber, setPrepareStrategy, setProtocolVersion, setResponseTimeout, setRetrySettings, setSendTimeout) +import Database.CQL.IO as C hiding (values) import Database.CQL.IO.Tinylog as C (mkLogger) import Imports import Network.Wreq @@ -62,3 +64,18 @@ initialContactsDisco (pack -> srv) url = liftIO $ do -- | Puts the address into a list using the same signature as the other initialContacts initialContactsPlain :: MonadIO m => Text -> m (NonEmpty String) initialContactsPlain address = pure $ unpack address :| [] + +-- | Return hosts in random order for a given DC. +-- +-- TODO: validate/guard against invalid datacentre names at service startup time +-- TODO: do we want to protect against a misconfigured datacentre during runtime while a datacentre migration is ongoing? Maybe not? +dcAwareRandomPolicy :: Text -> IO Policy +dcAwareRandomPolicy dc = do + randomPolicy <- C.random + pure $ randomPolicy {acceptable = dcAcceptable} + where + dcAcceptable :: Host -> IO Bool + dcAcceptable host = pure $ (host ^. dataCentre) == dc + +dcAwareSettings :: Text -> Settings +dcAwareSettings dc = setPolicy (dcAwareRandomPolicy dc) defSettings From 6da4ac16b7f2fcf0f847b0298ecbd24fb9d4741d Mon Sep 17 00:00:00 2001 From: jschaul Date: Tue, 26 Oct 2021 17:13:17 +0200 Subject: [PATCH 2/5] Spar: allow filtering nodes by datacentre --- libs/types-common/src/Util/Options.hs | 29 ++++++------------------- libs/wire-api/src/Wire/API/User/Saml.hs | 1 + services/spar/spar.integration.yaml | 1 + services/spar/src/Spar/Run.hs | 11 +++++++++- 4 files changed, 19 insertions(+), 23 deletions(-) diff --git a/libs/types-common/src/Util/Options.hs b/libs/types-common/src/Util/Options.hs index 33313e9a5c..1897caa8e6 100644 --- a/libs/types-common/src/Util/Options.hs +++ b/libs/types-common/src/Util/Options.hs @@ -87,7 +87,13 @@ makeLenses ''Endpoint data CassandraOpts = CassandraOpts { _casEndpoint :: !Endpoint, - _casKeyspace :: !Text + _casKeyspace :: !Text, + -- | If this option is unset, use all available nodes. + -- If this option is set, use only cassandra nodes in the given datacentre + -- + -- This option is most likely only necessary during a cassandra DC migration + -- FUTUREWORK: remove this option again, or support a datacentre migration feature + _casFilterNodesByDatacentre :: !(Maybe Text) } deriving (Show, Generic) @@ -156,27 +162,6 @@ parseConfigPath defaultPath desc = do parseAWSEndpoint :: ReadM AWSEndpoint parseAWSEndpoint = readerAsk >>= maybe (error "Could not parse AWS endpoint") return . fromByteString . fromString -cassandraParser :: Parser CassandraOpts -cassandraParser = - CassandraOpts - <$> ( Endpoint - <$> ( textOption $ - long "cassandra-host" - <> metavar "HOSTNAME" - <> help "Cassandra hostname or address" - ) - <*> ( option auto $ - long "cassandra-port" - <> metavar "PORT" - <> help "Cassandra port" - ) - ) - <*> ( textOption $ - long "cassandra-keyspace" - <> metavar "STRING" - <> help "Cassandra keyspace" - ) - discoUrlParser :: Parser Text discoUrlParser = textOption $ diff --git a/libs/wire-api/src/Wire/API/User/Saml.hs b/libs/wire-api/src/Wire/API/User/Saml.hs index c9b21fdf3a..7c7c99fed5 100644 --- a/libs/wire-api/src/Wire/API/User/Saml.hs +++ b/libs/wire-api/src/Wire/API/User/Saml.hs @@ -88,6 +88,7 @@ substituteVar' var val = ST.intercalate val . ST.splitOn var type Opts = Opts' DerivedOpts +-- FUTUREWORK: Shouldn't these types be in spar, not in wire-api? data Opts' a = Opts { saml :: !SAML.Config, brig :: !Endpoint, diff --git a/services/spar/spar.integration.yaml b/services/spar/spar.integration.yaml index 6b40d80950..77792b4ee1 100644 --- a/services/spar/spar.integration.yaml +++ b/services/spar/spar.integration.yaml @@ -27,6 +27,7 @@ cassandra: host: 127.0.0.1 port: 9042 keyspace: spar_test + filterNodesByDatacentre: datacenter1 # Wire/AWS specific, optional # discoUrl: "https://" diff --git a/services/spar/src/Spar/Run.hs b/services/spar/src/Spar/Run.hs index 8dd7bd7439..cf9f0284bd 100644 --- a/services/spar/src/Spar/Run.hs +++ b/services/spar/src/Spar/Run.hs @@ -54,7 +54,7 @@ import Spar.Orphans () import Spar.Sem.Logger.TinyLog (toLevel) import System.Logger.Class (Logger) import qualified System.Logger.Extended as Log -import Util.Options (casEndpoint, casKeyspace, epHost, epPort) +import Util.Options (casEndpoint, casFilterNodesByDatacentre, casKeyspace, epHost, epPort) import Wire.API.User.Saml as Types ---------------------------------------------------------------------- @@ -80,8 +80,17 @@ initCassandra opts lgr = do & Cas.setSendTimeout 3 & Cas.setResponseTimeout 10 & Cas.setProtocolVersion V4 + & Cas.setPolicy policy runClient cas $ Cas.versionCheck Data.schemaVersion pure cas + where + -- Use FilterNodesByDatacentre if set, otherwise use all available nodes + -- TODO: Discuss whether to fail startup of service here if datacentre doesn't exist. + policy :: IO Cas.Policy + policy = do + let filterOptions = Types.cassandra opts ^. casFilterNodesByDatacentre + Log.info lgr $ Log.msg ("Using the following cassandra load balancing options:" :: Text) . Log.field "filter_datacentre" (show filterOptions) + maybe Cas.random Cas.dcAwareRandomPolicy filterOptions ---------------------------------------------------------------------- -- servant / wai / warp From 3a3dac193b1b9e16fa6a8982fbc3aecae7002c73 Mon Sep 17 00:00:00 2001 From: jschaul Date: Wed, 3 Nov 2021 21:42:49 +0100 Subject: [PATCH 3/5] refactor policy code to make it reusable across services --- libs/cassandra-util/src/Cassandra/Settings.hs | 20 +++++++++++++------ services/spar/src/Spar/Run.hs | 17 +++++----------- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/libs/cassandra-util/src/Cassandra/Settings.hs b/libs/cassandra-util/src/Cassandra/Settings.hs index e03538dc6c..ae289828c0 100644 --- a/libs/cassandra-util/src/Cassandra/Settings.hs +++ b/libs/cassandra-util/src/Cassandra/Settings.hs @@ -24,7 +24,7 @@ module Cassandra.Settings initialContactsDisco, initialContactsPlain, dcAwareRandomPolicy, - dcAwareSettings, + dcFilterPolicyIfConfigured, ) where @@ -36,6 +36,7 @@ import Database.CQL.IO as C hiding (values) import Database.CQL.IO.Tinylog as C (mkLogger) import Imports import Network.Wreq +import qualified System.Logger as Log -- | This function is likely only useful at Wire, as it is Wire-infra specific. -- Given a server name and a url returning a wire-custom "disco" json (AWS describe-instances-like json), e.g. @@ -65,10 +66,20 @@ initialContactsDisco (pack -> srv) url = liftIO $ do initialContactsPlain :: MonadIO m => Text -> m (NonEmpty String) initialContactsPlain address = pure $ unpack address :| [] +-- | Use dcAwareRandomPolicy if config option filterNodesByDatacentre is set, +-- otherwise use all available nodes with the default random policy. +-- +-- This is only useful during a cassandra datacentre migration. +dcFilterPolicyIfConfigured :: Log.Logger -> Maybe Text -> IO Policy +dcFilterPolicyIfConfigured lgr mDatacentre = do + Log.info lgr $ + Log.msg ("Using the following cassandra load balancing options ('Policy'):" :: Text) + . Log.field "filter_datacentre" (show mDatacentre) + maybe random dcAwareRandomPolicy mDatacentre + -- | Return hosts in random order for a given DC. -- --- TODO: validate/guard against invalid datacentre names at service startup time --- TODO: do we want to protect against a misconfigured datacentre during runtime while a datacentre migration is ongoing? Maybe not? +-- This is only useful during a cassandra datacentre migration. dcAwareRandomPolicy :: Text -> IO Policy dcAwareRandomPolicy dc = do randomPolicy <- C.random @@ -76,6 +87,3 @@ dcAwareRandomPolicy dc = do where dcAcceptable :: Host -> IO Bool dcAcceptable host = pure $ (host ^. dataCentre) == dc - -dcAwareSettings :: Text -> Settings -dcAwareSettings dc = setPolicy (dcAwareRandomPolicy dc) defSettings diff --git a/services/spar/src/Spar/Run.hs b/services/spar/src/Spar/Run.hs index cf9f0284bd..325d96665e 100644 --- a/services/spar/src/Spar/Run.hs +++ b/services/spar/src/Spar/Run.hs @@ -62,9 +62,10 @@ import Wire.API.User.Saml as Types initCassandra :: Opts -> Logger -> IO ClientState initCassandra opts lgr = do + let cassOpts = Types.cassandra opts connectString <- maybe - (Cas.initialContactsPlain (Types.cassandra opts ^. casEndpoint . epHost)) + (Cas.initialContactsPlain (cassOpts ^. casEndpoint . epHost)) (Cas.initialContactsDisco "cassandra_spar") (cs <$> Types.discoUrl opts) cas <- @@ -72,25 +73,17 @@ initCassandra opts lgr = do Cas.defSettings & Cas.setLogger (Cas.mkLogger (Log.clone (Just "cassandra.spar") lgr)) & Cas.setContacts (NE.head connectString) (NE.tail connectString) - & Cas.setPortNumber (fromIntegral $ Types.cassandra opts ^. casEndpoint . epPort) - & Cas.setKeyspace (Keyspace $ Types.cassandra opts ^. casKeyspace) + & Cas.setPortNumber (fromIntegral $ cassOpts ^. casEndpoint . epPort) + & Cas.setKeyspace (Keyspace $ cassOpts ^. casKeyspace) & Cas.setMaxConnections 4 & Cas.setMaxStreams 128 & Cas.setPoolStripes 4 & Cas.setSendTimeout 3 & Cas.setResponseTimeout 10 & Cas.setProtocolVersion V4 - & Cas.setPolicy policy + & Cas.setPolicy (Cas.dcFilterPolicyIfConfigured lgr (cassOpts ^. casFilterNodesByDatacentre)) runClient cas $ Cas.versionCheck Data.schemaVersion pure cas - where - -- Use FilterNodesByDatacentre if set, otherwise use all available nodes - -- TODO: Discuss whether to fail startup of service here if datacentre doesn't exist. - policy :: IO Cas.Policy - policy = do - let filterOptions = Types.cassandra opts ^. casFilterNodesByDatacentre - Log.info lgr $ Log.msg ("Using the following cassandra load balancing options:" :: Text) . Log.field "filter_datacentre" (show filterOptions) - maybe Cas.random Cas.dcAwareRandomPolicy filterOptions ---------------------------------------------------------------------- -- servant / wai / warp From e65d8055fb0cbf55104c6664bf87990b902b8093 Mon Sep 17 00:00:00 2001 From: jschaul Date: Wed, 3 Nov 2021 22:02:29 +0100 Subject: [PATCH 4/5] Set dcFilterPolicyIfConfigured also for brig/galley/gundeck --- services/brig/brig.integration.yaml | 1 + services/brig/src/Brig/App.hs | 7 ++++--- services/galley/galley.integration.yaml | 1 + services/galley/src/Galley/App.hs | 1 + services/gundeck/gundeck.integration.yaml | 1 + services/gundeck/src/Gundeck/Env.hs | 1 + 6 files changed, 9 insertions(+), 3 deletions(-) diff --git a/services/brig/brig.integration.yaml b/services/brig/brig.integration.yaml index ee9b28c2f2..b5ca314dfb 100644 --- a/services/brig/brig.integration.yaml +++ b/services/brig/brig.integration.yaml @@ -7,6 +7,7 @@ cassandra: host: 127.0.0.1 port: 9042 keyspace: brig_test + # filterNodesByDatacentre: datacenter1 elasticsearch: url: http://127.0.0.1:9200 diff --git a/services/brig/src/Brig/App.hs b/services/brig/src/Brig/App.hs index 0089940425..98fca2d7d3 100644 --- a/services/brig/src/Brig/App.hs +++ b/services/brig/src/Brig/App.hs @@ -395,20 +395,21 @@ initCassandra :: Opts -> Logger -> IO Cas.ClientState initCassandra o g = do c <- maybe - (Cas.initialContactsPlain ((Opt.cassandra o) ^. casEndpoint . epHost)) + (Cas.initialContactsPlain (Opt.cassandra o ^. casEndpoint . epHost)) (Cas.initialContactsDisco "cassandra_brig") (unpack <$> Opt.discoUrl o) p <- Cas.init $ Cas.setLogger (Cas.mkLogger (Log.clone (Just "cassandra.brig") g)) . Cas.setContacts (NE.head c) (NE.tail c) - . Cas.setPortNumber (fromIntegral ((Opt.cassandra o) ^. casEndpoint . epPort)) - . Cas.setKeyspace (Keyspace ((Opt.cassandra o) ^. casKeyspace)) + . Cas.setPortNumber (fromIntegral (Opt.cassandra o ^. casEndpoint . epPort)) + . Cas.setKeyspace (Keyspace (Opt.cassandra o ^. casKeyspace)) . Cas.setMaxConnections 4 . Cas.setPoolStripes 4 . Cas.setSendTimeout 3 . Cas.setResponseTimeout 10 . Cas.setProtocolVersion Cas.V4 + . Cas.setPolicy (Cas.dcFilterPolicyIfConfigured g (Opt.cassandra o ^. casFilterNodesByDatacentre)) $ Cas.defSettings runClient p $ versionCheck schemaVersion return p diff --git a/services/galley/galley.integration.yaml b/services/galley/galley.integration.yaml index 5a617f7e3b..16ab778ef1 100644 --- a/services/galley/galley.integration.yaml +++ b/services/galley/galley.integration.yaml @@ -7,6 +7,7 @@ cassandra: host: 127.0.0.1 port: 9042 keyspace: galley_test + # filterNodesByDatacentre: datacenter1 brig: host: 0.0.0.0 diff --git a/services/galley/src/Galley/App.hs b/services/galley/src/Galley/App.hs index 3d938a8241..99ae177695 100644 --- a/services/galley/src/Galley/App.hs +++ b/services/galley/src/Galley/App.hs @@ -230,6 +230,7 @@ initCassandra o l = do . C.setSendTimeout 3 . C.setResponseTimeout 10 . C.setProtocolVersion C.V4 + . C.setPolicy (C.dcFilterPolicyIfConfigured l (o ^. optCassandra . casFilterNodesByDatacentre)) $ C.defSettings initHttpManager :: Opts -> IO Manager diff --git a/services/gundeck/gundeck.integration.yaml b/services/gundeck/gundeck.integration.yaml index edcd47d014..e33c0e6cc9 100644 --- a/services/gundeck/gundeck.integration.yaml +++ b/services/gundeck/gundeck.integration.yaml @@ -7,6 +7,7 @@ cassandra: host: 127.0.0.1 port: 9042 keyspace: gundeck_test + # filterNodesByDatacentre: datacenter1 redis: host: 127.0.0.1 diff --git a/services/gundeck/src/Gundeck/Env.hs b/services/gundeck/src/Gundeck/Env.hs index 3c9888281d..1a0c7d4298 100644 --- a/services/gundeck/src/Gundeck/Env.hs +++ b/services/gundeck/src/Gundeck/Env.hs @@ -93,6 +93,7 @@ createEnv m o = do . C.setSendTimeout 3 . C.setResponseTimeout 10 . C.setProtocolVersion C.V4 + . C.setPolicy (C.dcFilterPolicyIfConfigured l (o ^. optCassandra . casFilterNodesByDatacentre)) $ C.defSettings a <- Aws.mkEnv l o n io <- From 827a2579c443fb6924af6d5e855fb21629200e69 Mon Sep 17 00:00:00 2001 From: jschaul Date: Wed, 3 Nov 2021 22:18:33 +0100 Subject: [PATCH 5/5] changelog --- changelog.d/5-internal/cassandra-dc-aware-policy | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/5-internal/cassandra-dc-aware-policy diff --git a/changelog.d/5-internal/cassandra-dc-aware-policy b/changelog.d/5-internal/cassandra-dc-aware-policy new file mode 100644 index 0000000000..b574e655f5 --- /dev/null +++ b/changelog.d/5-internal/cassandra-dc-aware-policy @@ -0,0 +1 @@ +Add a 'filterNodesByDatacentre' config option useful during cassandra DC migration