Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/5-internal/cassandra-dc-aware-policy
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a 'filterNodesByDatacentre' config option useful during cassandra DC migration
3 changes: 2 additions & 1 deletion libs/cassandra-util/cassandra-util.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ cabal-version: 1.12
--
-- see: https://github.com/sol/hpack
--
-- hash: 9a030e92940be80f5ff4f31e38dbbddc2d24567f1114953edf9924cf61f9c43f
-- hash: 0e7f101562d82c7e04fbc1824f5bc9ef427915eacf3bd7370a2412a016a022be

name: cassandra-util
version: 0.16.5
Expand Down Expand Up @@ -35,6 +35,7 @@ library
aeson >=0.7
, base >=4.6 && <5.0
, conduit
, containers
, cql >=3.0.0
, cql-io >=0.14
, cql-io-tinylog
Expand Down
1 change: 1 addition & 0 deletions libs/cassandra-util/package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ dependencies:
- aeson >=0.7
- base >=4.6 && <5.0
- conduit
- containers
- cql >=3.0.0
- cql-io >=0.14
- cql-io-tinylog
Expand Down
27 changes: 26 additions & 1 deletion libs/cassandra-util/src/Cassandra/Settings.hs
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,20 @@ module Cassandra.Settings
( module C,
initialContactsDisco,
initialContactsPlain,
dcAwareRandomPolicy,
dcFilterPolicyIfConfigured,
)
where

import Control.Lens
import Data.Aeson.Lens
import Data.List.NonEmpty (NonEmpty (..))
import Data.Text (pack, stripSuffix, unpack)
import Database.CQL.IO as C (Policy, Settings, addContact, defSettings, setCompression, setConnectTimeout, setContacts, setIdleTimeout, setKeyspace, setLogger, setMaxConnections, setMaxStreams, setMaxTimeouts, setPolicy, setPoolStripes, setPortNumber, setPrepareStrategy, setProtocolVersion, setResponseTimeout, setRetrySettings, setSendTimeout)
import Database.CQL.IO as C hiding (values)
import Database.CQL.IO.Tinylog as C (mkLogger)
import Imports
import Network.Wreq
import qualified System.Logger as Log

-- | This function is likely only useful at Wire, as it is Wire-infra specific.
-- Given a server name and a url returning a wire-custom "disco" json (AWS describe-instances-like json), e.g.
Expand Down Expand Up @@ -62,3 +65,25 @@ initialContactsDisco (pack -> srv) url = liftIO $ do
-- | Puts the address into a list using the same signature as the other initialContacts
initialContactsPlain :: MonadIO m => Text -> m (NonEmpty String)
initialContactsPlain address = pure $ unpack address :| []

-- | Use dcAwareRandomPolicy if config option filterNodesByDatacentre is set,
-- otherwise use all available nodes with the default random policy.
--
-- This is only useful during a cassandra datacentre migration.
dcFilterPolicyIfConfigured :: Log.Logger -> Maybe Text -> IO Policy
dcFilterPolicyIfConfigured lgr mDatacentre = do
Log.info lgr $
Log.msg ("Using the following cassandra load balancing options ('Policy'):" :: Text)
. Log.field "filter_datacentre" (show mDatacentre)
maybe random dcAwareRandomPolicy mDatacentre

-- | Return hosts in random order for a given DC.
--
-- This is only useful during a cassandra datacentre migration.
dcAwareRandomPolicy :: Text -> IO Policy
dcAwareRandomPolicy dc = do
randomPolicy <- C.random
pure $ randomPolicy {acceptable = dcAcceptable}
where
dcAcceptable :: Host -> IO Bool
dcAcceptable host = pure $ (host ^. dataCentre) == dc
29 changes: 7 additions & 22 deletions libs/types-common/src/Util/Options.hs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,13 @@ makeLenses ''Endpoint

data CassandraOpts = CassandraOpts
{ _casEndpoint :: !Endpoint,
_casKeyspace :: !Text
_casKeyspace :: !Text,
-- | If this option is unset, use all available nodes.
-- If this option is set, use only cassandra nodes in the given datacentre
--
-- This option is most likely only necessary during a cassandra DC migration
-- FUTUREWORK: remove this option again, or support a datacentre migration feature
_casFilterNodesByDatacentre :: !(Maybe Text)
}
deriving (Show, Generic)

Expand Down Expand Up @@ -156,27 +162,6 @@ parseConfigPath defaultPath desc = do
parseAWSEndpoint :: ReadM AWSEndpoint
parseAWSEndpoint = readerAsk >>= maybe (error "Could not parse AWS endpoint") return . fromByteString . fromString

cassandraParser :: Parser CassandraOpts
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was dead code not used anywhere.

cassandraParser =
CassandraOpts
<$> ( Endpoint
<$> ( textOption $
long "cassandra-host"
<> metavar "HOSTNAME"
<> help "Cassandra hostname or address"
)
<*> ( option auto $
long "cassandra-port"
<> metavar "PORT"
<> help "Cassandra port"
)
)
<*> ( textOption $
long "cassandra-keyspace"
<> metavar "STRING"
<> help "Cassandra keyspace"
)

discoUrlParser :: Parser Text
discoUrlParser =
textOption $
Expand Down
1 change: 1 addition & 0 deletions libs/wire-api/src/Wire/API/User/Saml.hs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ substituteVar' var val = ST.intercalate val . ST.splitOn var

type Opts = Opts' DerivedOpts

-- FUTUREWORK: Shouldn't these types be in spar, not in wire-api?
data Opts' a = Opts
{ saml :: !SAML.Config,
brig :: !Endpoint,
Expand Down
1 change: 1 addition & 0 deletions services/brig/brig.integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ cassandra:
host: 127.0.0.1
port: 9042
keyspace: brig_test
# filterNodesByDatacentre: datacenter1

elasticsearch:
url: http://127.0.0.1:9200
Expand Down
7 changes: 4 additions & 3 deletions services/brig/src/Brig/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -395,20 +395,21 @@ initCassandra :: Opts -> Logger -> IO Cas.ClientState
initCassandra o g = do
c <-
maybe
(Cas.initialContactsPlain ((Opt.cassandra o) ^. casEndpoint . epHost))
(Cas.initialContactsPlain (Opt.cassandra o ^. casEndpoint . epHost))
(Cas.initialContactsDisco "cassandra_brig")
(unpack <$> Opt.discoUrl o)
p <-
Cas.init $
Cas.setLogger (Cas.mkLogger (Log.clone (Just "cassandra.brig") g))
. Cas.setContacts (NE.head c) (NE.tail c)
. Cas.setPortNumber (fromIntegral ((Opt.cassandra o) ^. casEndpoint . epPort))
. Cas.setKeyspace (Keyspace ((Opt.cassandra o) ^. casKeyspace))
. Cas.setPortNumber (fromIntegral (Opt.cassandra o ^. casEndpoint . epPort))
. Cas.setKeyspace (Keyspace (Opt.cassandra o ^. casKeyspace))
. Cas.setMaxConnections 4
. Cas.setPoolStripes 4
. Cas.setSendTimeout 3
. Cas.setResponseTimeout 10
. Cas.setProtocolVersion Cas.V4
. Cas.setPolicy (Cas.dcFilterPolicyIfConfigured g (Opt.cassandra o ^. casFilterNodesByDatacentre))
$ Cas.defSettings
runClient p $ versionCheck schemaVersion
return p
Expand Down
1 change: 1 addition & 0 deletions services/galley/galley.integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ cassandra:
host: 127.0.0.1
port: 9042
keyspace: galley_test
# filterNodesByDatacentre: datacenter1

brig:
host: 0.0.0.0
Expand Down
1 change: 1 addition & 0 deletions services/galley/src/Galley/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ initCassandra o l = do
. C.setSendTimeout 3
. C.setResponseTimeout 10
. C.setProtocolVersion C.V4
. C.setPolicy (C.dcFilterPolicyIfConfigured l (o ^. optCassandra . casFilterNodesByDatacentre))
$ C.defSettings

initHttpManager :: Opts -> IO Manager
Expand Down
1 change: 1 addition & 0 deletions services/gundeck/gundeck.integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ cassandra:
host: 127.0.0.1
port: 9042
keyspace: gundeck_test
# filterNodesByDatacentre: datacenter1

redis:
host: 127.0.0.1
Expand Down
1 change: 1 addition & 0 deletions services/gundeck/src/Gundeck/Env.hs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ createEnv m o = do
. C.setSendTimeout 3
. C.setResponseTimeout 10
. C.setProtocolVersion C.V4
. C.setPolicy (C.dcFilterPolicyIfConfigured l (o ^. optCassandra . casFilterNodesByDatacentre))
$ C.defSettings
a <- Aws.mkEnv l o n
io <-
Expand Down
1 change: 1 addition & 0 deletions services/spar/spar.integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ cassandra:
host: 127.0.0.1
port: 9042
keyspace: spar_test
filterNodesByDatacentre: datacenter1

# Wire/AWS specific, optional
# discoUrl: "https://"
Expand Down
10 changes: 6 additions & 4 deletions services/spar/src/Spar/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -54,32 +54,34 @@ import Spar.Orphans ()
import Spar.Sem.Logger.TinyLog (toLevel)
import System.Logger.Class (Logger)
import qualified System.Logger.Extended as Log
import Util.Options (casEndpoint, casKeyspace, epHost, epPort)
import Util.Options (casEndpoint, casFilterNodesByDatacentre, casKeyspace, epHost, epPort)
import Wire.API.User.Saml as Types

----------------------------------------------------------------------
-- cassandra

initCassandra :: Opts -> Logger -> IO ClientState
initCassandra opts lgr = do
let cassOpts = Types.cassandra opts
connectString <-
maybe
(Cas.initialContactsPlain (Types.cassandra opts ^. casEndpoint . epHost))
(Cas.initialContactsPlain (cassOpts ^. casEndpoint . epHost))
(Cas.initialContactsDisco "cassandra_spar")
(cs <$> Types.discoUrl opts)
cas <-
Cas.init $
Cas.defSettings
& Cas.setLogger (Cas.mkLogger (Log.clone (Just "cassandra.spar") lgr))
& Cas.setContacts (NE.head connectString) (NE.tail connectString)
& Cas.setPortNumber (fromIntegral $ Types.cassandra opts ^. casEndpoint . epPort)
& Cas.setKeyspace (Keyspace $ Types.cassandra opts ^. casKeyspace)
& Cas.setPortNumber (fromIntegral $ cassOpts ^. casEndpoint . epPort)
& Cas.setKeyspace (Keyspace $ cassOpts ^. casKeyspace)
& Cas.setMaxConnections 4
& Cas.setMaxStreams 128
& Cas.setPoolStripes 4
& Cas.setSendTimeout 3
& Cas.setResponseTimeout 10
& Cas.setProtocolVersion V4
& Cas.setPolicy (Cas.dcFilterPolicyIfConfigured lgr (cassOpts ^. casFilterNodesByDatacentre))
runClient cas $ Cas.versionCheck Data.schemaVersion
pure cas

Expand Down