diff --git a/.envrc b/.envrc index 18a26cfccf..02c85a3c19 100644 --- a/.envrc +++ b/.envrc @@ -37,3 +37,7 @@ path_add "PYTHONPATH" "./hack/python" # Locale export LC_ALL=en_US.UTF-8 export LANG=en_US.UTF-8 + +# RabbitMQ +export RABBITMQ_USERNAME=guest +export RABBITMQ_PASSWORD=alpaca-grapefruit \ No newline at end of file diff --git a/cabal.project b/cabal.project index 7c6a372b95..66ba354eeb 100644 --- a/cabal.project +++ b/cabal.project @@ -31,6 +31,7 @@ packages: , libs/wire-api-federation/ , libs/wire-message-proto-lens/ , libs/zauth/ + , services/background-worker/ , services/brig/ , services/cannon/ , services/cargohold/ @@ -66,6 +67,8 @@ package assets ghc-options: -Werror package auto-whitelist ghc-options: -Werror +package background-worker + ghc-options: -Werror package bilge ghc-options: -Werror package billing-team-member-backfill diff --git a/changelog.d/0-release-notes/background-worker b/changelog.d/0-release-notes/background-worker new file mode 100644 index 0000000000..34c3cf7ee5 --- /dev/null +++ b/changelog.d/0-release-notes/background-worker @@ -0,0 +1,38 @@ +This release introduces a new component: background-worker. This is currently +only used to forward notifications to federated backends. Enabling federation in +the wire-server helm chart automatically installs this component. + +When federation is enabled, wire-server will require running RabbitMQ. The helm +chart in `rabbitmq` can be used to install RabbitMQ. Please refer to the +documentation at https://docs.wire.com to install RabbitMQ in Kubernetes. These +new configurations are required: + +```yaml +brig: + config: + rabbitmq: + host: rabbitmq + port: 5672 + vHost: / + secrets: + rabbitmq: + username: + password: +background-worker: + config: + rabbitmq: + host: rabbitmq + port: 5672 + vHost: / + remoteDomains: [] + secrets: + rabbitmq: + username: + password: +``` + +The above are the default values (except for secrets, which do not have +defaults), if they work they are not required to be configured. +`background-worker.config.remoteDomains` should contain all the remote domains +with which the wire-server instance allows federating. This change is +incompatible with open-federation. \ No newline at end of file diff --git a/charts/background-worker/templates/deployment.yaml b/charts/background-worker/templates/deployment.yaml index 5ea3f8d8d6..4891ba0fdf 100644 --- a/charts/background-worker/templates/deployment.yaml +++ b/charts/background-worker/templates/deployment.yaml @@ -9,12 +9,11 @@ metadata: heritage: {{ .Release.Service }} spec: replicas: {{ .Values.replicaCount }} - # TODO(elland): Review this strategy: - type: RollingUpdate - rollingUpdate: - maxUnavailable: 0 - maxSurge: {{ .Values.replicaCount }} + # Ensures only one version of the background worker is running at any given + # moment. This means small downtime, but the background workers should be + # able to catch up. + type: Recreate selector: matchLabels: app: background-worker diff --git a/charts/background-worker/values.yaml b/charts/background-worker/values.yaml index 92ecda1665..dee71178f2 100644 --- a/charts/background-worker/values.yaml +++ b/charts/background-worker/values.yaml @@ -9,12 +9,11 @@ resources: cpu: "100m" limits: memory: "512Mi" -# TODO(elland): Create issue for a metrics endpoint +# FUTUREWORK: Implement metrics # metrics: # serviceMonitor: # enabled: false config: - # TODO(elland): Proper logging logLevel: Info logFormat: StructuredJSON rabbitmq: diff --git a/charts/integration/templates/integration-integration.yaml b/charts/integration/templates/integration-integration.yaml index 58876020c3..98a0f3df7e 100644 --- a/charts/integration/templates/integration-integration.yaml +++ b/charts/integration/templates/integration-integration.yaml @@ -71,6 +71,12 @@ spec: - name: AWS_REGION value: "eu-west-1" - name: RABBITMQ_USERNAME - value: "guest" + valueFrom: + secretKeyRef: + name: brig + key: rabbitmqUsername - name: RABBITMQ_PASSWORD - value: "guest" + valueFrom: + secretKeyRef: + name: brig + key: rabbitmqPassword diff --git a/charts/wire-server/requirements.yaml b/charts/wire-server/requirements.yaml index 8a1da9b06a..9a2ff8649b 100644 --- a/charts/wire-server/requirements.yaml +++ b/charts/wire-server/requirements.yaml @@ -111,6 +111,14 @@ dependencies: - federation - haskellServices - services +- name: background-worker + version: "0.0.42" + repository: "file://../background-worker" + tags: + - background-worker + - federation + - haskellServices + - services - name: sftd version: "0.0.42" repository: "file://../sftd" diff --git a/docs/src/how-to/install/helm-prod.md b/docs/src/how-to/install/helm-prod.md index 29045f1818..e19f8ad8b3 100644 --- a/docs/src/how-to/install/helm-prod.md +++ b/docs/src/how-to/install/helm-prod.md @@ -155,6 +155,32 @@ cp values/wire-server/prod-secrets.example.yaml my-wire-server/secrets.yaml cp values/wire-server/prod-values.example.yaml my-wire-server/values.yaml ``` +## How to install RabbitMQ + +This is only required when federation needs to be enabled. + +1. Generate password for rabbitmq: + + ```shell + openssl rand -base64 64 | env LC_CTYPE=C tr -dc a-zA-Z0-9 | head -c 42 > my-wire-server/rabbitmq-password + ``` + +2. Copy example values + + ```shell + cp values/rabbitmq/prod-secrets.example.yaml values/rabbitmq/secrets.yaml + cp values/rabbitmq/prod-values.example.yaml values/rabbitmq/values.yaml + ``` + +3. Add the generated secret from `my-wire-server/rabbitmq-password` to + `values/rabbitmq/secrets.yaml` under `rabbitmq.auth.password`. + +4. Install the helm chart using: + + ```shell + helm upgrade --install rabbitmq wire/rabbitmq -f values/rabbitmq/values.yaml -f values/rabbitmq/secrets.yaml + ``` + ## How to configure real SMTP (email) services In order for users to interact with their wire account, they need to receive mail from your wire server. @@ -189,9 +215,10 @@ apt install docker-ce sudo docker run --rm quay.io/wire/alpine-intermediate /dist/zauth -m gen-keypair -i 1 > my-wire-server/zauth.txt ``` -1. Add the generated secret from my-wire-server/restund.txt to my-wire-serwer/secrets.yaml under `brig.secrets.turn.secret` -2. add **both** the public and private parts from zauth.txt to secrets.yaml under `brig.secrets.zAuth` -3. Add the public key from zauth.txt to secrets.yaml under `nginz.secrets.zAuth.publicKeys` +1. Add the generated secret from `my-wire-server/restund.txt` to `my-wire-server/secrets.yaml` under `brig.secrets.turn.secret`. +2. add **both** the public and private parts from `my-wire-server/zauth.txt` to `my-wire-server/secrets.yaml` under `brig.secrets.zAuth`. +3. Add the public key from `my-wire-server/zauth.txt` to `my-wire-server/secrets.yaml` under `nginz.secrets.zAuth.publicKeys`. +4. Add the generated secret from my-wire-server/rabbitmq-password to `my-wire-server/secerts.yaml` under `brig.secrets.rabbitmq.password` and `background-worker.secrets.rabbitmq.password`. Great, now try the installation: diff --git a/hack/bin/set-wire-server-image-version.sh b/hack/bin/set-wire-server-image-version.sh index a471a7cb39..5277b69927 100755 --- a/hack/bin/set-wire-server-image-version.sh +++ b/hack/bin/set-wire-server-image-version.sh @@ -6,7 +6,7 @@ target_version=${1?$USAGE} TOP_LEVEL="$( cd "$( dirname "${BASH_SOURCE[0]}" )/../.." && pwd )" CHARTS_DIR="$TOP_LEVEL/.local/charts" -charts=(brig cannon galley gundeck spar cargohold proxy cassandra-migrations elasticsearch-index federator backoffice integration) +charts=(brig cannon galley gundeck spar cargohold proxy cassandra-migrations elasticsearch-index federator backoffice background-worker integration) for chart in "${charts[@]}"; do sed -i "s/^ tag: .*/ tag: $target_version/g" "$CHARTS_DIR/$chart/values.yaml" diff --git a/hack/helm_vars/wire-server/values.yaml.gotmpl b/hack/helm_vars/wire-server/values.yaml.gotmpl index bb92bc56e0..458dcea435 100644 --- a/hack/helm_vars/wire-server/values.yaml.gotmpl +++ b/hack/helm_vars/wire-server/values.yaml.gotmpl @@ -300,3 +300,16 @@ federator: federationStrategy: allowAll: true useSystemCAStore: false + +background-worker: + replicaCount: 1 + resources: + requests: {} + imagePullPolicy: {{ .Values.imagePullPolicy }} + config: + # See helmfile for the real value + remoteDomains: [] + secrets: + rabbitmq: + username: {{ .Values.rabbitmqUsername }} + password: {{ .Values.rabbitmqPassword }} diff --git a/hack/helmfile.yaml b/hack/helmfile.yaml index 444bfa031d..d54d67d436 100644 --- a/hack/helmfile.yaml +++ b/hack/helmfile.yaml @@ -141,6 +141,9 @@ releases: value: {{ .Values.federationDomain1 }} - name: brig.config.optSettings.setFederationDomainConfigs[0].domain value: {{ .Values.federationDomain2 }} + - name: background-worker.config.remoteDomains + values: + - {{ .Values.federationDomain2 }} needs: - 'databases-ephemeral' @@ -159,5 +162,8 @@ releases: value: {{ .Values.federationDomain2 }} - name: brig.config.optSettings.setFederationDomainConfigs[0].domain value: {{ .Values.federationDomain1 }} + - name: background-worker.config.remoteDomains + values: + - {{ .Values.federationDomain1 }} needs: - 'databases-ephemeral' diff --git a/libs/extended/default.nix b/libs/extended/default.nix index f940aef369..7a70d62f53 100644 --- a/libs/extended/default.nix +++ b/libs/extended/default.nix @@ -4,6 +4,7 @@ # dependencies are added or removed. { mkDerivation , aeson +, amqp , base , bytestring , cassandra-util @@ -18,13 +19,16 @@ , imports , lib , metrics-wai +, monad-control , optparse-applicative , resourcet +, retry , servant , servant-server , servant-swagger , string-conversions , temporary +, text , tinylog , wai }: @@ -34,6 +38,7 @@ mkDerivation { src = gitignoreSource ./.; libraryHaskellDepends = [ aeson + amqp base bytestring cassandra-util @@ -44,12 +49,15 @@ mkDerivation { http-types imports metrics-wai + monad-control optparse-applicative resourcet + retry servant servant-server servant-swagger string-conversions + text tinylog wai ]; diff --git a/libs/extended/extended.cabal b/libs/extended/extended.cabal index bd04352e35..06c766a8f4 100644 --- a/libs/extended/extended.cabal +++ b/libs/extended/extended.cabal @@ -17,7 +17,9 @@ license-file: LICENSE build-type: Simple library + -- cabal-fmt: expand src exposed-modules: + Network.AMQP.Extended Options.Applicative.Extended Servant.API.Extended Servant.API.Extended.RawM @@ -74,6 +76,7 @@ library build-depends: aeson + , amqp , base , bytestring , cassandra-util @@ -84,12 +87,15 @@ library , http-types , imports , metrics-wai + , monad-control , optparse-applicative , resourcet + , retry , servant , servant-server , servant-swagger , string-conversions + , text , tinylog , wai diff --git a/libs/extended/src/Network/AMQP/Extended.hs b/libs/extended/src/Network/AMQP/Extended.hs new file mode 100644 index 0000000000..7a5665069c --- /dev/null +++ b/libs/extended/src/Network/AMQP/Extended.hs @@ -0,0 +1,98 @@ +{-# LANGUAGE NumericUnderscores #-} + +module Network.AMQP.Extended where + +import Control.Monad.Catch +import Control.Monad.Trans.Control +import Control.Retry +import qualified Data.Text as Text +import Imports +import qualified Network.AMQP as Q +import System.Logger (Logger) +import qualified System.Logger as Log + +data RabbitMqHooks m = RabbitMqHooks + { -- | Called whenever there is a new channel. At any time there should be at + -- max 1 open channel. Perhaps this would need to change in future. + onNewChannel :: Q.Channel -> m (), + -- | Called when connection is closed. Any exceptions thrown by this would + -- be logged and ignored. + onConnectionClose :: m (), + -- | Called when the channel is closed. Any exceptions thrown by this would + -- be logged and ignored. + onChannelException :: SomeException -> m () + } + +-- | Connects with RabbitMQ and opens a channel. If the channel is closed for +-- some reasons, reopens the channel. If the connection is closed for some +-- reasons, keeps retrying to connect until it works. +openConnectionWithRetries :: + forall m. + (MonadIO m, MonadMask m, MonadBaseControl IO m) => + Logger -> + String -> + Int -> + Text -> + RabbitMqHooks m -> + m () +openConnectionWithRetries l host port vHost hooks = do + username <- liftIO $ Text.pack <$> getEnv "RABBITMQ_USERNAME" + password <- liftIO $ Text.pack <$> getEnv "RABBITMQ_PASSWORD" + connectWithRetries username password + where + connectWithRetries :: Text -> Text -> m () + connectWithRetries username password = do + -- Jittered exponential backoff with 1ms as starting delay and 5s as max + -- delay. + let policy = capDelay 5_000_000 $ fullJitterBackoff 1000 + logError willRetry e retryStatus = do + Log.err l $ + Log.msg (Log.val "Failed to connect to RabbitMQ") + . Log.field "error" (displayException @SomeException e) + . Log.field "willRetry" willRetry + . Log.field "retryCount" retryStatus.rsIterNumber + recovering + policy + [logRetries (const $ pure True) logError] + ( const $ do + Log.info l $ Log.msg (Log.val "Trying to connect to RabbitMQ") + connect username password + ) + + connect :: Text -> Text -> m () + connect username password = do + conn <- liftIO $ Q.openConnection' host (fromIntegral port) vHost username password + liftBaseWith $ \runInIO -> + Q.addConnectionClosedHandler conn True $ void $ runInIO $ do + hooks.onConnectionClose + `catch` logException l "onConnectionClose hook threw an exception, reconnecting to RabbitMQ anyway" + connectWithRetries username password + openChan conn + + openChan :: Q.Connection -> m () + openChan conn = do + Log.info l $ Log.msg (Log.val "Opening channel with RabbitMQ") + chan <- liftIO $ Q.openChannel conn + liftBaseWith $ \runInIO -> + Q.addChannelExceptionHandler chan (void . runInIO . chanExceptionHandler conn) + Log.info l $ Log.msg (Log.val "RabbitMQ channel opened") + hooks.onNewChannel chan + + chanExceptionHandler :: Q.Connection -> SomeException -> m () + chanExceptionHandler conn e = do + logException l "RabbitMQ channel closed" e + hooks.onChannelException e `catch` logException l "onChannelException hook threw an exception" + case (Q.isNormalChannelClose e, fromException e) of + (True, _) -> + Log.info l $ + Log.msg (Log.val "RabbitMQ channel is closed normally, not attempting to reopen channel") + (_, Just (Q.ConnectionClosedException {})) -> + Log.info l $ + Log.msg (Log.val "RabbitMQ connection is closed, not attempting to reopen channel") + _ -> openChan conn + +logException :: (MonadIO m) => Logger -> String -> SomeException -> m () +logException l m (SomeException e) = do + Log.err l $ + Log.msg m + . Log.field "error" (displayException e) diff --git a/libs/wire-api-federation/default.nix b/libs/wire-api-federation/default.nix index 2f4e1c1df2..2ac0f43e54 100644 --- a/libs/wire-api-federation/default.nix +++ b/libs/wire-api-federation/default.nix @@ -5,6 +5,7 @@ { mkDerivation , aeson , aeson-pretty +, amqp , base , bytestring , bytestring-conversion @@ -49,6 +50,7 @@ mkDerivation { src = gitignoreSource ./.; libraryHaskellDepends = [ aeson + amqp base bytestring bytestring-conversion diff --git a/libs/wire-api-federation/src/Wire/API/Federation/API.hs b/libs/wire-api-federation/src/Wire/API/Federation/API.hs index d94f1c6096..f344a80ced 100644 --- a/libs/wire-api-federation/src/Wire/API/Federation/API.hs +++ b/libs/wire-api-federation/src/Wire/API/Federation/API.hs @@ -23,6 +23,7 @@ module Wire.API.Federation.API HasFedEndpoint, HasUnsafeFedEndpoint, fedClient, + fedQueueClient, fedClientIn, unsafeFedClientIn, module Wire.API.MakesFederatedCall, @@ -41,6 +42,7 @@ import Servant.Client.Core import Wire.API.Federation.API.Brig import Wire.API.Federation.API.Cargohold import Wire.API.Federation.API.Galley +import Wire.API.Federation.BackendNotifications import Wire.API.Federation.Client import Wire.API.MakesFederatedCall import Wire.API.Routes.Named @@ -75,6 +77,12 @@ fedClient :: Client m api fedClient = clientIn (Proxy @api) (Proxy @m) +fedQueueClient :: + forall (comp :: Component) (name :: Symbol) m api. + (HasFedEndpoint comp api name, HasClient m api, m ~ FedQueueClient comp) => + Client m api +fedQueueClient = clientIn (Proxy @api) (Proxy @m) + fedClientIn :: forall (comp :: Component) (name :: Symbol) m api. (HasFedEndpoint comp api name, HasClient m api) => diff --git a/libs/wire-api-federation/src/Wire/API/Federation/BackendNotifications.hs b/libs/wire-api-federation/src/Wire/API/Federation/BackendNotifications.hs new file mode 100644 index 0000000000..44693f3d11 --- /dev/null +++ b/libs/wire-api-federation/src/Wire/API/Federation/BackendNotifications.hs @@ -0,0 +1,168 @@ +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE RecordWildCards #-} + +module Wire.API.Federation.BackendNotifications where + +import Control.Exception +import Control.Monad.Except +import Data.Aeson +import qualified Data.ByteString.Builder as Builder +import qualified Data.ByteString.Lazy as LBS +import Data.Domain +import qualified Data.Map as Map +import qualified Data.Sequence as Seq +import qualified Data.Text as Text +import Data.Text.Encoding +import qualified Data.Text.Lazy.Encoding as TL +import Imports +import qualified Network.AMQP as Q +import qualified Network.AMQP.Types as Q +import Network.HTTP.Types +import Servant +import Servant.Client +import Servant.Client.Core +import Servant.Types.SourceT +import Wire.API.Federation.API.Common +import Wire.API.Federation.Client +import Wire.API.Federation.Component +import Wire.API.Federation.Error +import Wire.API.RawJson + +-- | NOTE: Stored in RabbitMQ, any changes to serialization of this object could cause +-- notifications to get lost. +data BackendNotification = BackendNotification + { ownDomain :: Domain, + targetComponent :: Component, + path :: Text, + -- | Using RawJson here allows the backend notification pusher to not parse + -- this body, which could be very large and completely useless to the + -- pusher. This also makes development less clunky as we don't have to + -- create a sum type here for all types of notifications that could exist. + body :: RawJson + } + deriving (Show, Eq) + +instance ToJSON BackendNotification where + toJSON notif = + object + [ "ownDomain" .= notif.ownDomain, + "targetComponent" .= notif.targetComponent, + "path" .= notif.path, + "body" .= TL.decodeUtf8 notif.body.rawJsonBytes + ] + +instance FromJSON BackendNotification where + parseJSON = withObject "BackendNotification" $ \o -> + BackendNotification + <$> o .: "ownDomain" + <*> o .: "targetComponent" + <*> o .: "path" + <*> (RawJson . TL.encodeUtf8 <$> o .: "body") + +type BackendNotificationAPI = Capture "name" Text :> ReqBody '[JSON] RawJson :> Post '[JSON] EmptyResponse + +sendNotification :: FederatorClientEnv -> Component -> Text -> RawJson -> IO (Either FederatorClientError ()) +sendNotification env component path body = + case component of + Brig -> go @'Brig + Galley -> go @'Galley + Cargohold -> go @'Cargohold + where + withoutFirstSlash :: Text -> Text + withoutFirstSlash (Text.stripPrefix "/" -> Just t) = t + withoutFirstSlash t = t + + go :: forall c. KnownComponent c => IO (Either FederatorClientError ()) + go = + runFederatorClient env . void $ + clientIn (Proxy @BackendNotificationAPI) (Proxy @(FederatorClient c)) (withoutFirstSlash path) body + +enqueue :: Q.Channel -> Domain -> Domain -> Q.DeliveryMode -> FedQueueClient c () -> IO () +enqueue channel originDomain targetDomain deliveryMode (FedQueueClient action) = + runReaderT action FedQueueEnv {..} + +routingKey :: Domain -> Text +routingKey d = "backend-notifications." <> domainText d + +-- | If you ever change this function and modify +-- queue parameters, know that it will start failing in the +-- next release! So be prepared to write migrations. +ensureQueue :: Q.Channel -> Domain -> IO () +ensureQueue chan domain = do + let opts = + Q.QueueOpts + { Q.queueName = routingKey domain, + Q.queuePassive = False, + Q.queueDurable = True, + Q.queueExclusive = False, + Q.queueAutoDelete = False, + Q.queueHeaders = + Q.FieldTable $ + Map.fromList + [ ("x-single-active-consumer", Q.FVBool True), + ("x-queue-type", Q.FVString "quorum") + ] + } + void $ Q.declareQueue chan opts + +-- * Internal machinery + +-- | Reads a servant request and puts the information in relevant RabbitMQ +-- queue. Perhaps none of this should be servant code anymore. But it is here to +-- allow smooth transition to RabbitMQ based notification pushing. +-- +-- Use 'Wire.API.Federation.API.fedQueueClient' to create and action and pass it +-- to 'enqueue' +newtype FedQueueClient c a = FedQueueClient (ReaderT FedQueueEnv IO a) + deriving (Functor, Applicative, Monad, MonadIO, MonadReader FedQueueEnv) + +data FedQueueEnv = FedQueueEnv + { channel :: Q.Channel, + originDomain :: Domain, + targetDomain :: Domain, + deliveryMode :: Q.DeliveryMode + } + +data EnqueueError = EnqueueError String + deriving (Show) + +instance Exception EnqueueError + +instance KnownComponent c => RunClient (FedQueueClient c) where + runRequestAcceptStatus :: Maybe [Status] -> Request -> FedQueueClient c Response + runRequestAcceptStatus _ req = do + env <- ask + bodyLBS <- case requestBody req of + Just (RequestBodyLBS lbs, _) -> pure lbs + Just (RequestBodyBS bs, _) -> pure (LBS.fromStrict bs) + Just (RequestBodySource src, _) -> liftIO $ do + errOrRes <- runExceptT $ runSourceT src + either (throwIO . EnqueueError) (pure . mconcat) errOrRes + Nothing -> pure mempty + let notif = + BackendNotification + { ownDomain = env.originDomain, + targetComponent = componentVal @c, + path = decodeUtf8 $ LBS.toStrict $ Builder.toLazyByteString req.requestPath, + body = RawJson bodyLBS + } + let msg = + Q.newMsg + { Q.msgBody = encode notif, + Q.msgDeliveryMode = Just (env.deliveryMode), + Q.msgContentType = Just "application/json" + } + -- Empty string means default exchange + exchange = "" + liftIO $ do + ensureQueue env.channel env.targetDomain + void $ Q.publishMsg env.channel exchange (routingKey env.targetDomain) msg + pure $ + Response + { responseHttpVersion = http20, + responseStatusCode = status200, + responseHeaders = Seq.singleton (hContentType, "application/json"), + responseBody = "{}" + } + throwClientError :: ClientError -> FedQueueClient c a + throwClientError = liftIO . throwIO diff --git a/libs/wire-api-federation/src/Wire/API/Federation/Client.hs b/libs/wire-api-federation/src/Wire/API/Federation/Client.hs index 2fab754002..445c40d732 100644 --- a/libs/wire-api-federation/src/Wire/API/Federation/Client.hs +++ b/libs/wire-api-federation/src/Wire/API/Federation/Client.hs @@ -195,11 +195,15 @@ withHTTP2StreamingRequest successfulStatus req handleResponse = do Just (RequestBodyBS bs, _) -> pure (LBS.fromStrict bs) Just (RequestBodySource _, _) -> throwError FederatorClientStreamingNotSupported Nothing -> pure mempty - let req' = + let headers = + toList (requestHeaders req) + <> [(originDomainHeaderName, toByteString' (ceOriginDomain env))] + <> [(HTTP.hAccept, HTTP.renderHeader (toList $ req.requestAccept))] + req' = HTTP2.requestBuilder (requestMethod req) (LBS.toStrict (toLazyByteString path)) - (toList (requestHeaders req) <> [(originDomainHeaderName, toByteString' (ceOriginDomain env))]) + headers (lazyByteString body) let Endpoint (Text.encodeUtf8 -> hostname) (fromIntegral -> port) = ceFederator env resp <- diff --git a/libs/wire-api-federation/wire-api-federation.cabal b/libs/wire-api-federation/wire-api-federation.cabal index 460a8b65be..fd8da8f07f 100644 --- a/libs/wire-api-federation/wire-api-federation.cabal +++ b/libs/wire-api-federation/wire-api-federation.cabal @@ -21,6 +21,7 @@ library Wire.API.Federation.API.Cargohold Wire.API.Federation.API.Common Wire.API.Federation.API.Galley + Wire.API.Federation.BackendNotifications Wire.API.Federation.Client Wire.API.Federation.Component Wire.API.Federation.Domain @@ -80,6 +81,7 @@ library build-depends: aeson >=2.0.1.0 + , amqp , base >=4.6 && <5.0 , bytestring , bytestring-conversion diff --git a/libs/wire-api/src/Wire/API/MakesFederatedCall.hs b/libs/wire-api/src/Wire/API/MakesFederatedCall.hs index 0e0f627db4..6edc1b6d63 100644 --- a/libs/wire-api/src/Wire/API/MakesFederatedCall.hs +++ b/libs/wire-api/src/Wire/API/MakesFederatedCall.hs @@ -31,11 +31,12 @@ module Wire.API.MakesFederatedCall ) where -import Data.Aeson (Value (..)) +import Data.Aeson import Data.Constraint import Data.Kind import Data.Metrics.Servant import Data.Proxy +import Data.Schema import Data.Swagger.Operation (addExtensions) import qualified Data.Text as T import GHC.TypeLits @@ -78,6 +79,16 @@ data Component | Cargohold deriving (Show, Eq, Generic) deriving (Arbitrary) via (GenericUniform Component) + deriving (ToJSON, FromJSON) via (Schema Component) + +instance ToSchema Component where + schema = + enum @Text "Component" $ + mconcat + [ element "brig" Brig, + element "galley" Galley, + element "cargohold" Cargohold + ] -- | A typeclass corresponding to calls to federated services. This class has -- no methods, and exists only to automatically propagate information up to diff --git a/libs/wire-api/src/Wire/API/RawJson.hs b/libs/wire-api/src/Wire/API/RawJson.hs index 5a3f621589..3f2d209698 100644 --- a/libs/wire-api/src/Wire/API/RawJson.hs +++ b/libs/wire-api/src/Wire/API/RawJson.hs @@ -36,6 +36,9 @@ newtype RawJson = RawJson {rawJsonBytes :: LByteString} instance {-# OVERLAPPING #-} MimeUnrender JSON RawJson where mimeUnrender _ = pure . RawJson +instance MimeRender JSON RawJson where + mimeRender _ = rawJsonBytes + instance Swagger.ToSchema RawJson where declareNamedSchema _ = pure . Swagger.NamedSchema (Just "RawJson") $ diff --git a/nix/local-haskell-packages.nix b/nix/local-haskell-packages.nix index 0a2fb873aa..73d30174da 100644 --- a/nix/local-haskell-packages.nix +++ b/nix/local-haskell-packages.nix @@ -35,6 +35,7 @@ wire-api = hself.callPackage ../libs/wire-api/default.nix { inherit gitignoreSource; }; wire-message-proto-lens = hself.callPackage ../libs/wire-message-proto-lens/default.nix { inherit gitignoreSource; }; zauth = hself.callPackage ../libs/zauth/default.nix { inherit gitignoreSource; }; + background-worker = hself.callPackage ../services/background-worker/default.nix { inherit gitignoreSource; }; brig = hself.callPackage ../services/brig/default.nix { inherit gitignoreSource; }; cannon = hself.callPackage ../services/cannon/default.nix { inherit gitignoreSource; }; cargohold = hself.callPackage ../services/cargohold/default.nix { inherit gitignoreSource; }; diff --git a/nix/overlay.nix b/nix/overlay.nix index 2d73416ebd..3bcd85b5a1 100644 --- a/nix/overlay.nix +++ b/nix/overlay.nix @@ -101,5 +101,5 @@ self: super: { inherit (super) stdenv fetchurl; }; - rabbitmqadmin = super.callPackage ./pkgs/rabbitmqadmin {}; + rabbitmqadmin = super.callPackage ./pkgs/rabbitmqadmin { }; } diff --git a/nix/pkgs/rabbitmqadmin/default.nix b/nix/pkgs/rabbitmqadmin/default.nix index 0aa51355b1..f659aa34f6 100644 --- a/nix/pkgs/rabbitmqadmin/default.nix +++ b/nix/pkgs/rabbitmqadmin/default.nix @@ -1,4 +1,4 @@ -{stdenv, python3, fetchgit}: +{ stdenv, python3, fetchgit }: stdenv.mkDerivation rec { name = "rabbitmqadmin"; version = "3.11.13"; @@ -9,7 +9,7 @@ stdenv.mkDerivation rec { sha256 = "sha256-lbOuxJz66xlGlgodbz8Xlb3hcaewVFMqf9R/5XlqaAY="; }; - propagatedBuildInputs = [python3]; + propagatedBuildInputs = [ python3 ]; dontBuild = true; diff --git a/nix/wire-server.nix b/nix/wire-server.nix index b39279b296..6ee3771a6b 100644 --- a/nix/wire-server.nix +++ b/nix/wire-server.nix @@ -85,6 +85,7 @@ let inconsistencies = [ "inconsistencies" ]; api-simulations = [ "api-smoketest" "api-loadtest" ]; zauth = [ "zauth" ]; + background-worker = [ "background-worker" ]; integration = [ "integration" ]; }; diff --git a/services/background-worker/LICENSE b/services/background-worker/LICENSE new file mode 100644 index 0000000000..dba13ed2dd --- /dev/null +++ b/services/background-worker/LICENSE @@ -0,0 +1,661 @@ + GNU AFFERO GENERAL PUBLIC LICENSE + Version 3, 19 November 2007 + + Copyright (C) 2007 Free Software Foundation, Inc. + Everyone is permitted to copy and distribute verbatim copies + of this license document, but changing it is not allowed. + + Preamble + + The GNU Affero General Public License is a free, copyleft license for +software and other kinds of works, specifically designed to ensure +cooperation with the community in the case of network server software. + + The licenses for most software and other practical works are designed +to take away your freedom to share and change the works. By contrast, +our General Public Licenses are intended to guarantee your freedom to +share and change all versions of a program--to make sure it remains free +software for all its users. + + When we speak of free software, we are referring to freedom, not +price. Our General Public Licenses are designed to make sure that you +have the freedom to distribute copies of free software (and charge for +them if you wish), that you receive source code or can get it if you +want it, that you can change the software or use pieces of it in new +free programs, and that you know you can do these things. + + Developers that use our General Public Licenses protect your rights +with two steps: (1) assert copyright on the software, and (2) offer +you this License which gives you legal permission to copy, distribute +and/or modify the software. + + A secondary benefit of defending all users' freedom is that +improvements made in alternate versions of the program, if they +receive widespread use, become available for other developers to +incorporate. Many developers of free software are heartened and +encouraged by the resulting cooperation. However, in the case of +software used on network servers, this result may fail to come about. +The GNU General Public License permits making a modified version and +letting the public access it on a server without ever releasing its +source code to the public. + + The GNU Affero General Public License is designed specifically to +ensure that, in such cases, the modified source code becomes available +to the community. It requires the operator of a network server to +provide the source code of the modified version running there to the +users of that server. Therefore, public use of a modified version, on +a publicly accessible server, gives the public access to the source +code of the modified version. + + An older license, called the Affero General Public License and +published by Affero, was designed to accomplish similar goals. This is +a different license, not a version of the Affero GPL, but Affero has +released a new version of the Affero GPL which permits relicensing under +this license. + + The precise terms and conditions for copying, distribution and +modification follow. + + TERMS AND CONDITIONS + + 0. Definitions. + + "This License" refers to version 3 of the GNU Affero General Public License. + + "Copyright" also means copyright-like laws that apply to other kinds of +works, such as semiconductor masks. + + "The Program" refers to any copyrightable work licensed under this +License. Each licensee is addressed as "you". "Licensees" and +"recipients" may be individuals or organizations. + + To "modify" a work means to copy from or adapt all or part of the work +in a fashion requiring copyright permission, other than the making of an +exact copy. The resulting work is called a "modified version" of the +earlier work or a work "based on" the earlier work. + + A "covered work" means either the unmodified Program or a work based +on the Program. + + To "propagate" a work means to do anything with it that, without +permission, would make you directly or secondarily liable for +infringement under applicable copyright law, except executing it on a +computer or modifying a private copy. Propagation includes copying, +distribution (with or without modification), making available to the +public, and in some countries other activities as well. + + To "convey" a work means any kind of propagation that enables other +parties to make or receive copies. Mere interaction with a user through +a computer network, with no transfer of a copy, is not conveying. + + An interactive user interface displays "Appropriate Legal Notices" +to the extent that it includes a convenient and prominently visible +feature that (1) displays an appropriate copyright notice, and (2) +tells the user that there is no warranty for the work (except to the +extent that warranties are provided), that licensees may convey the +work under this License, and how to view a copy of this License. If +the interface presents a list of user commands or options, such as a +menu, a prominent item in the list meets this criterion. + + 1. Source Code. + + The "source code" for a work means the preferred form of the work +for making modifications to it. "Object code" means any non-source +form of a work. + + A "Standard Interface" means an interface that either is an official +standard defined by a recognized standards body, or, in the case of +interfaces specified for a particular programming language, one that +is widely used among developers working in that language. + + The "System Libraries" of an executable work include anything, other +than the work as a whole, that (a) is included in the normal form of +packaging a Major Component, but which is not part of that Major +Component, and (b) serves only to enable use of the work with that +Major Component, or to implement a Standard Interface for which an +implementation is available to the public in source code form. A +"Major Component", in this context, means a major essential component +(kernel, window system, and so on) of the specific operating system +(if any) on which the executable work runs, or a compiler used to +produce the work, or an object code interpreter used to run it. + + The "Corresponding Source" for a work in object code form means all +the source code needed to generate, install, and (for an executable +work) run the object code and to modify the work, including scripts to +control those activities. However, it does not include the work's +System Libraries, or general-purpose tools or generally available free +programs which are used unmodified in performing those activities but +which are not part of the work. For example, Corresponding Source +includes interface definition files associated with source files for +the work, and the source code for shared libraries and dynamically +linked subprograms that the work is specifically designed to require, +such as by intimate data communication or control flow between those +subprograms and other parts of the work. + + The Corresponding Source need not include anything that users +can regenerate automatically from other parts of the Corresponding +Source. + + The Corresponding Source for a work in source code form is that +same work. + + 2. Basic Permissions. + + All rights granted under this License are granted for the term of +copyright on the Program, and are irrevocable provided the stated +conditions are met. This License explicitly affirms your unlimited +permission to run the unmodified Program. The output from running a +covered work is covered by this License only if the output, given its +content, constitutes a covered work. This License acknowledges your +rights of fair use or other equivalent, as provided by copyright law. + + You may make, run and propagate covered works that you do not +convey, without conditions so long as your license otherwise remains +in force. You may convey covered works to others for the sole purpose +of having them make modifications exclusively for you, or provide you +with facilities for running those works, provided that you comply with +the terms of this License in conveying all material for which you do +not control copyright. Those thus making or running the covered works +for you must do so exclusively on your behalf, under your direction +and control, on terms that prohibit them from making any copies of +your copyrighted material outside their relationship with you. + + Conveying under any other circumstances is permitted solely under +the conditions stated below. Sublicensing is not allowed; section 10 +makes it unnecessary. + + 3. Protecting Users' Legal Rights From Anti-Circumvention Law. + + No covered work shall be deemed part of an effective technological +measure under any applicable law fulfilling obligations under article +11 of the WIPO copyright treaty adopted on 20 December 1996, or +similar laws prohibiting or restricting circumvention of such +measures. + + When you convey a covered work, you waive any legal power to forbid +circumvention of technological measures to the extent such circumvention +is effected by exercising rights under this License with respect to +the covered work, and you disclaim any intention to limit operation or +modification of the work as a means of enforcing, against the work's +users, your or third parties' legal rights to forbid circumvention of +technological measures. + + 4. Conveying Verbatim Copies. + + You may convey verbatim copies of the Program's source code as you +receive it, in any medium, provided that you conspicuously and +appropriately publish on each copy an appropriate copyright notice; +keep intact all notices stating that this License and any +non-permissive terms added in accord with section 7 apply to the code; +keep intact all notices of the absence of any warranty; and give all +recipients a copy of this License along with the Program. + + You may charge any price or no price for each copy that you convey, +and you may offer support or warranty protection for a fee. + + 5. Conveying Modified Source Versions. + + You may convey a work based on the Program, or the modifications to +produce it from the Program, in the form of source code under the +terms of section 4, provided that you also meet all of these conditions: + + a) The work must carry prominent notices stating that you modified + it, and giving a relevant date. + + b) The work must carry prominent notices stating that it is + released under this License and any conditions added under section + 7. This requirement modifies the requirement in section 4 to + "keep intact all notices". + + c) You must license the entire work, as a whole, under this + License to anyone who comes into possession of a copy. This + License will therefore apply, along with any applicable section 7 + additional terms, to the whole of the work, and all its parts, + regardless of how they are packaged. This License gives no + permission to license the work in any other way, but it does not + invalidate such permission if you have separately received it. + + d) If the work has interactive user interfaces, each must display + Appropriate Legal Notices; however, if the Program has interactive + interfaces that do not display Appropriate Legal Notices, your + work need not make them do so. + + A compilation of a covered work with other separate and independent +works, which are not by their nature extensions of the covered work, +and which are not combined with it such as to form a larger program, +in or on a volume of a storage or distribution medium, is called an +"aggregate" if the compilation and its resulting copyright are not +used to limit the access or legal rights of the compilation's users +beyond what the individual works permit. Inclusion of a covered work +in an aggregate does not cause this License to apply to the other +parts of the aggregate. + + 6. Conveying Non-Source Forms. + + You may convey a covered work in object code form under the terms +of sections 4 and 5, provided that you also convey the +machine-readable Corresponding Source under the terms of this License, +in one of these ways: + + a) Convey the object code in, or embodied in, a physical product + (including a physical distribution medium), accompanied by the + Corresponding Source fixed on a durable physical medium + customarily used for software interchange. + + b) Convey the object code in, or embodied in, a physical product + (including a physical distribution medium), accompanied by a + written offer, valid for at least three years and valid for as + long as you offer spare parts or customer support for that product + model, to give anyone who possesses the object code either (1) a + copy of the Corresponding Source for all the software in the + product that is covered by this License, on a durable physical + medium customarily used for software interchange, for a price no + more than your reasonable cost of physically performing this + conveying of source, or (2) access to copy the + Corresponding Source from a network server at no charge. + + c) Convey individual copies of the object code with a copy of the + written offer to provide the Corresponding Source. This + alternative is allowed only occasionally and noncommercially, and + only if you received the object code with such an offer, in accord + with subsection 6b. + + d) Convey the object code by offering access from a designated + place (gratis or for a charge), and offer equivalent access to the + Corresponding Source in the same way through the same place at no + further charge. You need not require recipients to copy the + Corresponding Source along with the object code. If the place to + copy the object code is a network server, the Corresponding Source + may be on a different server (operated by you or a third party) + that supports equivalent copying facilities, provided you maintain + clear directions next to the object code saying where to find the + Corresponding Source. Regardless of what server hosts the + Corresponding Source, you remain obligated to ensure that it is + available for as long as needed to satisfy these requirements. + + e) Convey the object code using peer-to-peer transmission, provided + you inform other peers where the object code and Corresponding + Source of the work are being offered to the general public at no + charge under subsection 6d. + + A separable portion of the object code, whose source code is excluded +from the Corresponding Source as a System Library, need not be +included in conveying the object code work. + + A "User Product" is either (1) a "consumer product", which means any +tangible personal property which is normally used for personal, family, +or household purposes, or (2) anything designed or sold for incorporation +into a dwelling. In determining whether a product is a consumer product, +doubtful cases shall be resolved in favor of coverage. For a particular +product received by a particular user, "normally used" refers to a +typical or common use of that class of product, regardless of the status +of the particular user or of the way in which the particular user +actually uses, or expects or is expected to use, the product. A product +is a consumer product regardless of whether the product has substantial +commercial, industrial or non-consumer uses, unless such uses represent +the only significant mode of use of the product. + + "Installation Information" for a User Product means any methods, +procedures, authorization keys, or other information required to install +and execute modified versions of a covered work in that User Product from +a modified version of its Corresponding Source. The information must +suffice to ensure that the continued functioning of the modified object +code is in no case prevented or interfered with solely because +modification has been made. + + If you convey an object code work under this section in, or with, or +specifically for use in, a User Product, and the conveying occurs as +part of a transaction in which the right of possession and use of the +User Product is transferred to the recipient in perpetuity or for a +fixed term (regardless of how the transaction is characterized), the +Corresponding Source conveyed under this section must be accompanied +by the Installation Information. But this requirement does not apply +if neither you nor any third party retains the ability to install +modified object code on the User Product (for example, the work has +been installed in ROM). + + The requirement to provide Installation Information does not include a +requirement to continue to provide support service, warranty, or updates +for a work that has been modified or installed by the recipient, or for +the User Product in which it has been modified or installed. Access to a +network may be denied when the modification itself materially and +adversely affects the operation of the network or violates the rules and +protocols for communication across the network. + + Corresponding Source conveyed, and Installation Information provided, +in accord with this section must be in a format that is publicly +documented (and with an implementation available to the public in +source code form), and must require no special password or key for +unpacking, reading or copying. + + 7. Additional Terms. + + "Additional permissions" are terms that supplement the terms of this +License by making exceptions from one or more of its conditions. +Additional permissions that are applicable to the entire Program shall +be treated as though they were included in this License, to the extent +that they are valid under applicable law. If additional permissions +apply only to part of the Program, that part may be used separately +under those permissions, but the entire Program remains governed by +this License without regard to the additional permissions. + + When you convey a copy of a covered work, you may at your option +remove any additional permissions from that copy, or from any part of +it. (Additional permissions may be written to require their own +removal in certain cases when you modify the work.) You may place +additional permissions on material, added by you to a covered work, +for which you have or can give appropriate copyright permission. + + Notwithstanding any other provision of this License, for material you +add to a covered work, you may (if authorized by the copyright holders of +that material) supplement the terms of this License with terms: + + a) Disclaiming warranty or limiting liability differently from the + terms of sections 15 and 16 of this License; or + + b) Requiring preservation of specified reasonable legal notices or + author attributions in that material or in the Appropriate Legal + Notices displayed by works containing it; or + + c) Prohibiting misrepresentation of the origin of that material, or + requiring that modified versions of such material be marked in + reasonable ways as different from the original version; or + + d) Limiting the use for publicity purposes of names of licensors or + authors of the material; or + + e) Declining to grant rights under trademark law for use of some + trade names, trademarks, or service marks; or + + f) Requiring indemnification of licensors and authors of that + material by anyone who conveys the material (or modified versions of + it) with contractual assumptions of liability to the recipient, for + any liability that these contractual assumptions directly impose on + those licensors and authors. + + All other non-permissive additional terms are considered "further +restrictions" within the meaning of section 10. If the Program as you +received it, or any part of it, contains a notice stating that it is +governed by this License along with a term that is a further +restriction, you may remove that term. If a license document contains +a further restriction but permits relicensing or conveying under this +License, you may add to a covered work material governed by the terms +of that license document, provided that the further restriction does +not survive such relicensing or conveying. + + If you add terms to a covered work in accord with this section, you +must place, in the relevant source files, a statement of the +additional terms that apply to those files, or a notice indicating +where to find the applicable terms. + + Additional terms, permissive or non-permissive, may be stated in the +form of a separately written license, or stated as exceptions; +the above requirements apply either way. + + 8. Termination. + + You may not propagate or modify a covered work except as expressly +provided under this License. Any attempt otherwise to propagate or +modify it is void, and will automatically terminate your rights under +this License (including any patent licenses granted under the third +paragraph of section 11). + + However, if you cease all violation of this License, then your +license from a particular copyright holder is reinstated (a) +provisionally, unless and until the copyright holder explicitly and +finally terminates your license, and (b) permanently, if the copyright +holder fails to notify you of the violation by some reasonable means +prior to 60 days after the cessation. + + Moreover, your license from a particular copyright holder is +reinstated permanently if the copyright holder notifies you of the +violation by some reasonable means, this is the first time you have +received notice of violation of this License (for any work) from that +copyright holder, and you cure the violation prior to 30 days after +your receipt of the notice. + + Termination of your rights under this section does not terminate the +licenses of parties who have received copies or rights from you under +this License. If your rights have been terminated and not permanently +reinstated, you do not qualify to receive new licenses for the same +material under section 10. + + 9. Acceptance Not Required for Having Copies. + + You are not required to accept this License in order to receive or +run a copy of the Program. Ancillary propagation of a covered work +occurring solely as a consequence of using peer-to-peer transmission +to receive a copy likewise does not require acceptance. However, +nothing other than this License grants you permission to propagate or +modify any covered work. These actions infringe copyright if you do +not accept this License. Therefore, by modifying or propagating a +covered work, you indicate your acceptance of this License to do so. + + 10. Automatic Licensing of Downstream Recipients. + + Each time you convey a covered work, the recipient automatically +receives a license from the original licensors, to run, modify and +propagate that work, subject to this License. You are not responsible +for enforcing compliance by third parties with this License. + + An "entity transaction" is a transaction transferring control of an +organization, or substantially all assets of one, or subdividing an +organization, or merging organizations. If propagation of a covered +work results from an entity transaction, each party to that +transaction who receives a copy of the work also receives whatever +licenses to the work the party's predecessor in interest had or could +give under the previous paragraph, plus a right to possession of the +Corresponding Source of the work from the predecessor in interest, if +the predecessor has it or can get it with reasonable efforts. + + You may not impose any further restrictions on the exercise of the +rights granted or affirmed under this License. For example, you may +not impose a license fee, royalty, or other charge for exercise of +rights granted under this License, and you may not initiate litigation +(including a cross-claim or counterclaim in a lawsuit) alleging that +any patent claim is infringed by making, using, selling, offering for +sale, or importing the Program or any portion of it. + + 11. Patents. + + A "contributor" is a copyright holder who authorizes use under this +License of the Program or a work on which the Program is based. The +work thus licensed is called the contributor's "contributor version". + + A contributor's "essential patent claims" are all patent claims +owned or controlled by the contributor, whether already acquired or +hereafter acquired, that would be infringed by some manner, permitted +by this License, of making, using, or selling its contributor version, +but do not include claims that would be infringed only as a +consequence of further modification of the contributor version. For +purposes of this definition, "control" includes the right to grant +patent sublicenses in a manner consistent with the requirements of +this License. + + Each contributor grants you a non-exclusive, worldwide, royalty-free +patent license under the contributor's essential patent claims, to +make, use, sell, offer for sale, import and otherwise run, modify and +propagate the contents of its contributor version. + + In the following three paragraphs, a "patent license" is any express +agreement or commitment, however denominated, not to enforce a patent +(such as an express permission to practice a patent or covenant not to +sue for patent infringement). To "grant" such a patent license to a +party means to make such an agreement or commitment not to enforce a +patent against the party. + + If you convey a covered work, knowingly relying on a patent license, +and the Corresponding Source of the work is not available for anyone +to copy, free of charge and under the terms of this License, through a +publicly available network server or other readily accessible means, +then you must either (1) cause the Corresponding Source to be so +available, or (2) arrange to deprive yourself of the benefit of the +patent license for this particular work, or (3) arrange, in a manner +consistent with the requirements of this License, to extend the patent +license to downstream recipients. "Knowingly relying" means you have +actual knowledge that, but for the patent license, your conveying the +covered work in a country, or your recipient's use of the covered work +in a country, would infringe one or more identifiable patents in that +country that you have reason to believe are valid. + + If, pursuant to or in connection with a single transaction or +arrangement, you convey, or propagate by procuring conveyance of, a +covered work, and grant a patent license to some of the parties +receiving the covered work authorizing them to use, propagate, modify +or convey a specific copy of the covered work, then the patent license +you grant is automatically extended to all recipients of the covered +work and works based on it. + + A patent license is "discriminatory" if it does not include within +the scope of its coverage, prohibits the exercise of, or is +conditioned on the non-exercise of one or more of the rights that are +specifically granted under this License. You may not convey a covered +work if you are a party to an arrangement with a third party that is +in the business of distributing software, under which you make payment +to the third party based on the extent of your activity of conveying +the work, and under which the third party grants, to any of the +parties who would receive the covered work from you, a discriminatory +patent license (a) in connection with copies of the covered work +conveyed by you (or copies made from those copies), or (b) primarily +for and in connection with specific products or compilations that +contain the covered work, unless you entered into that arrangement, +or that patent license was granted, prior to 28 March 2007. + + Nothing in this License shall be construed as excluding or limiting +any implied license or other defenses to infringement that may +otherwise be available to you under applicable patent law. + + 12. No Surrender of Others' Freedom. + + If conditions are imposed on you (whether by court order, agreement or +otherwise) that contradict the conditions of this License, they do not +excuse you from the conditions of this License. If you cannot convey a +covered work so as to satisfy simultaneously your obligations under this +License and any other pertinent obligations, then as a consequence you may +not convey it at all. For example, if you agree to terms that obligate you +to collect a royalty for further conveying from those to whom you convey +the Program, the only way you could satisfy both those terms and this +License would be to refrain entirely from conveying the Program. + + 13. Remote Network Interaction; Use with the GNU General Public License. + + Notwithstanding any other provision of this License, if you modify the +Program, your modified version must prominently offer all users +interacting with it remotely through a computer network (if your version +supports such interaction) an opportunity to receive the Corresponding +Source of your version by providing access to the Corresponding Source +from a network server at no charge, through some standard or customary +means of facilitating copying of software. This Corresponding Source +shall include the Corresponding Source for any work covered by version 3 +of the GNU General Public License that is incorporated pursuant to the +following paragraph. + + Notwithstanding any other provision of this License, you have +permission to link or combine any covered work with a work licensed +under version 3 of the GNU General Public License into a single +combined work, and to convey the resulting work. The terms of this +License will continue to apply to the part which is the covered work, +but the work with which it is combined will remain governed by version +3 of the GNU General Public License. + + 14. Revised Versions of this License. + + The Free Software Foundation may publish revised and/or new versions of +the GNU Affero General Public License from time to time. Such new versions +will be similar in spirit to the present version, but may differ in detail to +address new problems or concerns. + + Each version is given a distinguishing version number. If the +Program specifies that a certain numbered version of the GNU Affero General +Public License "or any later version" applies to it, you have the +option of following the terms and conditions either of that numbered +version or of any later version published by the Free Software +Foundation. If the Program does not specify a version number of the +GNU Affero General Public License, you may choose any version ever published +by the Free Software Foundation. + + If the Program specifies that a proxy can decide which future +versions of the GNU Affero General Public License can be used, that proxy's +public statement of acceptance of a version permanently authorizes you +to choose that version for the Program. + + Later license versions may give you additional or different +permissions. However, no additional obligations are imposed on any +author or copyright holder as a result of your choosing to follow a +later version. + + 15. Disclaimer of Warranty. + + THERE IS NO WARRANTY FOR THE PROGRAM, TO THE EXTENT PERMITTED BY +APPLICABLE LAW. EXCEPT WHEN OTHERWISE STATED IN WRITING THE COPYRIGHT +HOLDERS AND/OR OTHER PARTIES PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY +OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, +THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE PROGRAM +IS WITH YOU. SHOULD THE PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF +ALL NECESSARY SERVICING, REPAIR OR CORRECTION. + + 16. Limitation of Liability. + + IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING +WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MODIFIES AND/OR CONVEYS +THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR DAMAGES, INCLUDING ANY +GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE +USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED TO LOSS OF +DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD +PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER PROGRAMS), +EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF +SUCH DAMAGES. + + 17. Interpretation of Sections 15 and 16. + + If the disclaimer of warranty and limitation of liability provided +above cannot be given local legal effect according to their terms, +reviewing courts shall apply local law that most closely approximates +an absolute waiver of all civil liability in connection with the +Program, unless a warranty or assumption of liability accompanies a +copy of the Program in return for a fee. + + END OF TERMS AND CONDITIONS + + How to Apply These Terms to Your New Programs + + If you develop a new program, and you want it to be of the greatest +possible use to the public, the best way to achieve this is to make it +free software which everyone can redistribute and change under these terms. + + To do so, attach the following notices to the program. It is safest +to attach them to the start of each source file to most effectively +state the exclusion of warranty; and each file should have at least +the "copyright" line and a pointer to where the full notice is found. + + + Copyright (C) + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . + +Also add information on how to contact you by electronic and paper mail. + + If your software can interact with users remotely through a computer +network, you should also make sure that it provides a way for users to +get its source. For example, if your program is a web application, its +interface could display a "Source" link that leads users to an archive +of the code. There are many ways you could offer source, and different +solutions will be better for different programs; see section 13 for the +specific requirements. + + You should also get your employer (if you work as a programmer) or school, +if any, to sign a "copyright disclaimer" for the program, if necessary. +For more information on this, and how to apply and follow the GNU AGPL, see +. diff --git a/services/background-worker/background-worker.cabal b/services/background-worker/background-worker.cabal new file mode 100644 index 0000000000..bbe3e41eec --- /dev/null +++ b/services/background-worker/background-worker.cabal @@ -0,0 +1,210 @@ +cabal-version: 1.24 +name: background-worker +version: 0.1.0.0 +synopsis: Runs background work +license: AGPL-3 +license-file: LICENSE +author: Wire Swiss GmbH +maintainer: backend@wire.com +category: Network +build-type: Simple + +library + -- cabal-fmt: expand src + exposed-modules: + Wire.BackendNotificationPusher + Wire.BackgroundWorker + Wire.BackgroundWorker.Env + Wire.BackgroundWorker.Options + + hs-source-dirs: src + default-language: Haskell2010 + ghc-options: + -O2 -Wall -Wincomplete-uni-patterns -Wincomplete-record-updates + -Wpartial-fields -fwarn-tabs -optP-Wno-nonportable-include-path + -funbox-strict-fields -Wredundant-constraints -Wunused-packages + + build-depends: + aeson + , amqp + , exceptions + , extended + , HsOpenSSL + , http2-manager + , imports + , monad-control + , retry + , tinylog + , transformers-base + , types-common + , wire-api-federation + + default-extensions: + NoImplicitPrelude + AllowAmbiguousTypes + BangPatterns + ConstraintKinds + DataKinds + DefaultSignatures + DeriveFunctor + DeriveGeneric + DeriveLift + DeriveTraversable + DerivingStrategies + DerivingVia + DuplicateRecordFields + EmptyCase + FlexibleContexts + FlexibleInstances + FunctionalDependencies + GADTs + InstanceSigs + KindSignatures + LambdaCase + MultiParamTypeClasses + MultiWayIf + NamedFieldPuns + OverloadedRecordDot + OverloadedStrings + PackageImports + PatternSynonyms + PolyKinds + QuasiQuotes + RankNTypes + ScopedTypeVariables + StandaloneDeriving + TupleSections + TypeApplications + TypeFamilies + TypeFamilyDependencies + TypeOperators + UndecidableInstances + ViewPatterns + +executable background-worker + main-is: Main.hs + build-depends: + background-worker + , HsOpenSSL + , imports + , types-common + + hs-source-dirs: exec + default-language: Haskell2010 + ghc-options: + -O2 -Wall -Wincomplete-uni-patterns -Wincomplete-record-updates + -Wpartial-fields -fwarn-tabs -optP-Wno-nonportable-include-path + -funbox-strict-fields -Wredundant-constraints -Wunused-packages + + default-extensions: + NoImplicitPrelude + AllowAmbiguousTypes + BangPatterns + ConstraintKinds + DataKinds + DefaultSignatures + DeriveFunctor + DeriveGeneric + DeriveLift + DeriveTraversable + DerivingStrategies + DerivingVia + DuplicateRecordFields + EmptyCase + FlexibleContexts + FlexibleInstances + FunctionalDependencies + GADTs + InstanceSigs + KindSignatures + LambdaCase + MultiParamTypeClasses + MultiWayIf + NamedFieldPuns + OverloadedRecordDot + OverloadedStrings + PackageImports + PatternSynonyms + PolyKinds + QuasiQuotes + RankNTypes + ScopedTypeVariables + StandaloneDeriving + TupleSections + TypeApplications + TypeFamilies + TypeFamilyDependencies + TypeOperators + UndecidableInstances + ViewPatterns + +test-suite background-worker-test + default-language: Haskell2010 + type: exitcode-stdio-1.0 + hs-source-dirs: test + main-is: Main.hs + ghc-options: + -O2 -Wall -Wincomplete-uni-patterns -Wincomplete-record-updates + -Wpartial-fields -fwarn-tabs -optP-Wno-nonportable-include-path + -funbox-strict-fields -threaded -with-rtsopts=-N + -Wredundant-constraints -Wunused-packages + + -- cabal-fmt: expand test + other-modules: + Main + Test.Wire.BackendNotificationPusherSpec + + build-depends: + aeson + , amqp + , background-worker + , federator + , hspec + , imports + , QuickCheck + , tinylog + , types-common + , wire-api + , wire-api-federation + + default-extensions: + NoImplicitPrelude + AllowAmbiguousTypes + BangPatterns + ConstraintKinds + DataKinds + DefaultSignatures + DeriveFunctor + DeriveGeneric + DeriveLift + DeriveTraversable + DerivingStrategies + DerivingVia + DuplicateRecordFields + EmptyCase + FlexibleContexts + FlexibleInstances + FunctionalDependencies + GADTs + InstanceSigs + KindSignatures + LambdaCase + MultiParamTypeClasses + MultiWayIf + NamedFieldPuns + OverloadedRecordDot + OverloadedStrings + PackageImports + PatternSynonyms + PolyKinds + QuasiQuotes + RankNTypes + ScopedTypeVariables + StandaloneDeriving + TupleSections + TypeApplications + TypeFamilies + TypeFamilyDependencies + TypeOperators + UndecidableInstances + ViewPatterns diff --git a/services/background-worker/background-worker.integration.yaml b/services/background-worker/background-worker.integration.yaml new file mode 100644 index 0000000000..c2dd54bfb6 --- /dev/null +++ b/services/background-worker/background-worker.integration.yaml @@ -0,0 +1,13 @@ +logLevel: Info + +federatorInternal: + host: 127.0.0.1 + port: 8097 + +rabbitmq: + host: 127.0.0.1 + port: 5672 + vHost: / + +remoteDomains: + - b.example.com diff --git a/services/background-worker/default.nix b/services/background-worker/default.nix new file mode 100644 index 0000000000..c737582585 --- /dev/null +++ b/services/background-worker/default.nix @@ -0,0 +1,63 @@ +# WARNING: GENERATED FILE, DO NOT EDIT. +# This file is generated by running hack/bin/generate-local-nix-packages.sh and +# must be regenerated whenever local packages are added or removed, or +# dependencies are added or removed. +{ mkDerivation +, aeson +, amqp +, exceptions +, extended +, federator +, gitignoreSource +, HsOpenSSL +, hspec +, http2-manager +, imports +, lib +, monad-control +, QuickCheck +, retry +, tinylog +, transformers-base +, types-common +, wire-api +, wire-api-federation +}: +mkDerivation { + pname = "background-worker"; + version = "0.1.0.0"; + src = gitignoreSource ./.; + isLibrary = true; + isExecutable = true; + libraryHaskellDepends = [ + aeson + amqp + exceptions + extended + HsOpenSSL + http2-manager + imports + monad-control + retry + tinylog + transformers-base + types-common + wire-api-federation + ]; + executableHaskellDepends = [ HsOpenSSL imports types-common ]; + testHaskellDepends = [ + aeson + amqp + federator + hspec + imports + QuickCheck + tinylog + types-common + wire-api + wire-api-federation + ]; + description = "Runs background work"; + license = lib.licenses.agpl3Only; + mainProgram = "background-worker"; +} diff --git a/services/background-worker/exec/Main.hs b/services/background-worker/exec/Main.hs new file mode 100644 index 0000000000..70e2554c12 --- /dev/null +++ b/services/background-worker/exec/Main.hs @@ -0,0 +1,13 @@ +module Main where + +import Imports +import OpenSSL (withOpenSSL) +import Util.Options +import Wire.BackgroundWorker + +main :: IO () +main = withOpenSSL $ do + let desc = "Background Worker" + defaultPath = "/etc/wire/background-worker/conf/background-worker.yaml" + options <- getOptions desc Nothing defaultPath + run options diff --git a/services/background-worker/src/Wire/BackendNotificationPusher.hs b/services/background-worker/src/Wire/BackendNotificationPusher.hs new file mode 100644 index 0000000000..741e5a5090 --- /dev/null +++ b/services/background-worker/src/Wire/BackendNotificationPusher.hs @@ -0,0 +1,90 @@ +{-# LANGUAGE NumericUnderscores #-} +{-# LANGUAGE RecordWildCards #-} + +module Wire.BackendNotificationPusher where + +import Control.Monad.Catch +import Control.Retry +import qualified Data.Aeson as A +import Data.Domain +import Imports +import qualified Network.AMQP as Q +import qualified Network.AMQP.Lifted as QL +import qualified System.Logger.Class as Log +import Wire.API.Federation.BackendNotifications +import Wire.API.Federation.Client +import Wire.BackgroundWorker.Env + +startPushingNotifications :: + Q.Channel -> + Domain -> + AppT IO Q.ConsumerTag +startPushingNotifications chan domain = do + lift $ ensureQueue chan domain + QL.consumeMsgs chan (routingKey domain) Q.Ack (pushNotification domain) + +-- | This class exists to help with testing, making the envelope in unit test is +-- too difficult. So we use fake envelopes in the unit tests. +class RabbitMQEnvelope e where + ack :: e -> IO () + reject :: e -> Bool -> IO () + +instance RabbitMQEnvelope Q.Envelope where + ack = Q.ackEnv + reject = Q.rejectEnv + +pushNotification :: RabbitMQEnvelope e => Domain -> (Q.Message, e) -> AppT IO () +pushNotification targetDomain (msg, envelope) = do + -- Jittered exponential backoff with 10ms as starting delay and + -- 300s as max delay. + -- + -- FUTUREWORK: Pull these numbers into config + let policy = capDelay 300_000_000 $ fullJitterBackoff 10000 + logErrr willRetry (SomeException e) rs = + Log.err $ + Log.msg (Log.val "Exception occurred while pushing notification") + . Log.field "error" (displayException e) + . Log.field "domain" (domainText targetDomain) + . Log.field "willRetry" willRetry + . Log.field "retryCount" rs.rsIterNumber + skipChanThreadKilled _ = Handler $ \(_ :: Q.ChanThreadKilledException) -> pure False + handlers = + skipAsyncExceptions + <> [ skipChanThreadKilled, + logRetries (const $ pure True) logErrr + ] + recovering policy handlers $ const go + where + go :: AppT IO () + go = case A.eitherDecode @BackendNotification (Q.msgBody msg) of + Left e -> do + Log.err $ + Log.msg (Log.val "Failed to parse notification, the notification will be ignored") + . Log.field "domain" (domainText targetDomain) + . Log.field "error" e + + -- FUTUREWORK: This rejects the message without any requeueing. This is + -- dangerous as it could happen that a new type of notification is + -- introduced and an old instance of this worker is running, in which case + -- the notification will just get dropped. On the other hand not dropping + -- this message blocks the whole queue. Perhaps there is a better way to + -- deal with this. + lift $ reject envelope False + Right notif -> do + ceFederator <- asks federatorInternal + ceHttp2Manager <- asks http2Manager + let ceOriginDomain = notif.ownDomain + ceTargetDomain = targetDomain + fcEnv = FederatorClientEnv {..} + liftIO $ either throwM pure =<< sendNotification fcEnv notif.targetComponent notif.path notif.body + lift $ ack envelope + +-- FUTUREWORK: Recosider using 1 channel for many consumers. It shouldn't matter +-- for a handful of remote domains. +startWorker :: [Domain] -> Q.Channel -> AppT IO () +startWorker remoteDomains chan = do + -- This ensures that we receive notifications 1 by 1 which ensures they are + -- delivered in order. + lift $ Q.qos chan 0 1 False + mapM_ (startPushingNotifications chan) remoteDomains + forever $ threadDelay maxBound diff --git a/services/background-worker/src/Wire/BackgroundWorker.hs b/services/background-worker/src/Wire/BackgroundWorker.hs new file mode 100644 index 0000000000..fea244b963 --- /dev/null +++ b/services/background-worker/src/Wire/BackgroundWorker.hs @@ -0,0 +1,24 @@ +{-# LANGUAGE DisambiguateRecordFields #-} + +module Wire.BackgroundWorker where + +import Imports +import Network.AMQP.Extended +import qualified Wire.BackendNotificationPusher as BackendNotificationPusher +import Wire.BackgroundWorker.Env +import Wire.BackgroundWorker.Options + +-- FUTUREWORK: Start an http service with status and metrics endpoints +run :: Opts -> IO () +run opts = do + env <- mkEnv opts + -- FUTUREWORK: Make some way to tracking all the workers, currently there is + -- only one so we can just block on it. + openConnectionWithRetries env.logger opts.rabbitmq.host opts.rabbitmq.port opts.rabbitmq.vHost $ + RabbitMqHooks + { onNewChannel = runAppT env . BackendNotificationPusher.startWorker opts.remoteDomains, + -- FUTUREWORK: Use these for metrics + onChannelException = const $ pure (), + onConnectionClose = pure () + } + forever $ threadDelay maxBound diff --git a/services/background-worker/src/Wire/BackgroundWorker/Env.hs b/services/background-worker/src/Wire/BackgroundWorker/Env.hs new file mode 100644 index 0000000000..0213e3549d --- /dev/null +++ b/services/background-worker/src/Wire/BackgroundWorker/Env.hs @@ -0,0 +1,68 @@ +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE RecordWildCards #-} + +module Wire.BackgroundWorker.Env where + +import Control.Monad.Base +import Control.Monad.Catch +import Control.Monad.Trans.Control +import HTTP2.Client.Manager +import Imports +import OpenSSL.Session (SSLOption (..)) +import qualified OpenSSL.Session as SSL +import qualified System.Logger as Log +import System.Logger.Class +import qualified System.Logger.Extended as Log +import Util.Options +import Wire.BackgroundWorker.Options + +data Env = Env + { http2Manager :: Http2Manager, + logger :: Logger, + federatorInternal :: Endpoint + } + +mkEnv :: Opts -> IO Env +mkEnv opts = do + http2Manager <- initHttp2Manager + logger <- Log.mkLogger opts.logLevel Nothing opts.logFormat + let federatorInternal = opts.federatorInternal + pure Env {..} + +initHttp2Manager :: IO Http2Manager +initHttp2Manager = do + ctx <- SSL.context + SSL.contextAddOption ctx SSL_OP_NO_SSLv2 + SSL.contextAddOption ctx SSL_OP_NO_SSLv3 + SSL.contextAddOption ctx SSL_OP_NO_TLSv1 + SSL.contextSetCiphers ctx "HIGH" + SSL.contextSetVerificationMode ctx $ + SSL.VerifyPeer True True Nothing + SSL.contextSetDefaultVerifyPaths ctx + http2ManagerWithSSLCtx ctx + +newtype AppT m a where + AppT :: {unAppT :: ReaderT Env m a} -> AppT m a + deriving + ( Functor, + Applicative, + Monad, + MonadIO, + MonadCatch, + MonadThrow, + MonadMask, + MonadReader Env, + MonadTrans + ) + +deriving newtype instance MonadBase b m => MonadBase b (AppT m) + +deriving newtype instance MonadBaseControl b m => MonadBaseControl b (AppT m) + +instance MonadIO m => MonadLogger (AppT m) where + log lvl m = do + l <- asks logger + Log.log l lvl m + +runAppT :: Env -> AppT m a -> m a +runAppT env app = runReaderT (unAppT app) env diff --git a/services/background-worker/src/Wire/BackgroundWorker/Options.hs b/services/background-worker/src/Wire/BackgroundWorker/Options.hs new file mode 100644 index 0000000000..f26edd8bc1 --- /dev/null +++ b/services/background-worker/src/Wire/BackgroundWorker/Options.hs @@ -0,0 +1,27 @@ +module Wire.BackgroundWorker.Options where + +import Data.Aeson +import Data.Domain +import Imports +import System.Logger.Extended +import Util.Options + +data Opts = Opts + { logLevel :: !Level, + logFormat :: !(Maybe (Last LogFormat)), + federatorInternal :: !Endpoint, + rabbitmq :: !RabbitMqOpts, + remoteDomains :: [Domain] + } + deriving (Show, Generic) + +instance FromJSON Opts + +data RabbitMqOpts = RabbitMqOpts + { host :: !String, + port :: !Int, + vHost :: !Text + } + deriving (Show, Generic) + +instance FromJSON RabbitMqOpts diff --git a/services/background-worker/test/Main.hs b/services/background-worker/test/Main.hs new file mode 100644 index 0000000000..a824f8c30c --- /dev/null +++ b/services/background-worker/test/Main.hs @@ -0,0 +1 @@ +{-# OPTIONS_GHC -F -pgmF hspec-discover #-} diff --git a/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs b/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs new file mode 100644 index 0000000000..ae6baeee53 --- /dev/null +++ b/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs @@ -0,0 +1,140 @@ +{-# LANGUAGE RecordWildCards #-} + +module Test.Wire.BackendNotificationPusherSpec where + +import qualified Data.Aeson as Aeson +import Data.Domain +import Data.Range +import Federator.MockServer +import Imports +import qualified Network.AMQP as Q +import qualified System.Logger as Logger +import Test.Hspec +import Test.QuickCheck +import Util.Options +import Wire.API.Federation.API +import Wire.API.Federation.API.Brig +import Wire.API.Federation.API.Common +import Wire.API.Federation.BackendNotifications +import Wire.API.RawJson +import Wire.BackendNotificationPusher +import Wire.BackgroundWorker.Env + +runTestAppT :: AppT IO a -> Int -> IO a +runTestAppT app port = do + http2Manager <- initHttp2Manager + logger <- Logger.new Logger.defSettings + let federatorInternal = Endpoint "localhost" (fromIntegral port) + env = Env {..} + runAppT env app + +spec :: Spec +spec = describe "Wire.BackendNotificationPusher" $ do + it "should push notifications" $ do + let returnSuccess _ = pure ("application/json", Aeson.encode EmptyResponse) + let origDomain = Domain "origin.example.com" + targetDomain = Domain "target.example.com" + -- Just using 'arbitrary' could generate a very big list, making tests very + -- slow. Make me wonder if notification pusher should even try to parse the + -- actual content, seems like wasted compute power. + notifContent <- generate $ UserDeletedConnectionsNotification <$> arbitrary <*> (unsafeRange . (: []) <$> arbitrary) + let notif = + BackendNotification + { targetComponent = Brig, + ownDomain = origDomain, + path = "/on-user-deleted-connections", + body = RawJson $ Aeson.encode notifContent + } + envelope <- newFakeEnvelope + let msg = + Q.newMsg + { Q.msgBody = Aeson.encode notif, + Q.msgContentType = Just "application/json" + } + + (_, fedReqs) <- + withTempMockFederator [] returnSuccess . runTestAppT $ do + pushNotification targetDomain (msg, envelope) + + readIORef envelope.acks `shouldReturn` 1 + readIORef envelope.rejections `shouldReturn` [] + fedReqs + `shouldBe` [ FederatedRequest + { frTargetDomain = targetDomain, + frOriginDomain = origDomain, + frComponent = Brig, + frRPC = "on-user-deleted-connections", + frBody = Aeson.encode notifContent + } + ] + + it "should reject invalid notifications" $ do + let returnSuccess _ = pure ("application/json", Aeson.encode EmptyResponse) + envelope <- newFakeEnvelope + let msg = + Q.newMsg + { Q.msgBody = "unparseable notification", + Q.msgContentType = Just "application/json" + } + (_, fedReqs) <- + withTempMockFederator [] returnSuccess . runTestAppT $ + pushNotification (Domain "target.example.com") (msg, envelope) + + readIORef envelope.acks `shouldReturn` 0 + readIORef envelope.rejections `shouldReturn` [False] + fedReqs `shouldBe` [] + + it "should retry failed deliveries" $ do + isFirstReqRef <- newIORef True + let returnSuccessSecondTime _ = + atomicModifyIORef isFirstReqRef $ \isFirstReq -> + if isFirstReq + then (False, ("text/html", "down for maintenance")) + else (False, ("application/json", Aeson.encode EmptyResponse)) + origDomain = Domain "origin.example.com" + targetDomain = Domain "target.example.com" + notifContent <- generate $ UserDeletedConnectionsNotification <$> arbitrary <*> (unsafeRange . (: []) <$> arbitrary) + let notif = + BackendNotification + { targetComponent = Brig, + ownDomain = origDomain, + path = "/on-user-deleted-connections", + body = RawJson $ Aeson.encode notifContent + } + envelope <- newFakeEnvelope + let msg = + Q.newMsg + { Q.msgBody = Aeson.encode notif, + Q.msgContentType = Just "application/json" + } + + (_, fedReqs) <- + withTempMockFederator [] returnSuccessSecondTime . runTestAppT $ do + pushNotification targetDomain (msg, envelope) + + readIORef envelope.acks `shouldReturn` 1 + readIORef envelope.rejections `shouldReturn` [] + let expectedReq = + FederatedRequest + { frTargetDomain = targetDomain, + frOriginDomain = origDomain, + frComponent = Brig, + frRPC = "on-user-deleted-connections", + frBody = Aeson.encode notifContent + } + fedReqs `shouldBe` [expectedReq, expectedReq] + +instance RabbitMQEnvelope FakeEnvelope where + ack e = atomicModifyIORef' e.acks $ \a -> (a + 1, ()) + reject e requeueFlag = atomicModifyIORef' e.rejections $ \r -> (r <> [requeueFlag], ()) + +data FakeEnvelope = FakeEnvelope + { rejections :: IORef [Bool], + acks :: IORef Int + } + +newFakeEnvelope :: IO FakeEnvelope +newFakeEnvelope = + FakeEnvelope + <$> newIORef [] + <*> newIORef 0 diff --git a/services/brig/brig.cabal b/services/brig/brig.cabal index 7935b59004..4ff603d903 100644 --- a/services/brig/brig.cabal +++ b/services/brig/brig.cabal @@ -191,6 +191,7 @@ library , amazonka-dynamodb >=2 , amazonka-ses >=2 , amazonka-sqs >=2 + , amqp , async >=2.1 , auto-update >=0.1 , base >=4 && <5 @@ -213,7 +214,6 @@ library , data-timeout >=0.3 , dns , dns-util - , either >=4.3 , enclosed-exceptions >=1.0 , errors >=1.4 , exceptions >=0.5 @@ -269,7 +269,6 @@ library , schema-profunctor , scientific >=0.3.4 , servant - , servant-client , servant-server , servant-swagger , servant-swagger-ui diff --git a/services/brig/default.nix b/services/brig/default.nix index bad459003c..fa72a654fe 100644 --- a/services/brig/default.nix +++ b/services/brig/default.nix @@ -9,6 +9,7 @@ , amazonka-dynamodb , amazonka-ses , amazonka-sqs +, amqp , async , attoparsec , auto-update @@ -34,7 +35,6 @@ , data-timeout , dns , dns-util -, either , email-validate , enclosed-exceptions , errors @@ -171,6 +171,7 @@ mkDerivation { amazonka-dynamodb amazonka-ses amazonka-sqs + amqp async auto-update base @@ -193,7 +194,6 @@ mkDerivation { data-timeout dns dns-util - either enclosed-exceptions errors exceptions @@ -249,7 +249,6 @@ mkDerivation { schema-profunctor scientific servant - servant-client servant-server servant-swagger servant-swagger-ui diff --git a/services/brig/src/Brig/App.hs b/services/brig/src/Brig/App.hs index 584eb4deaf..bb27faea32 100644 --- a/services/brig/src/Brig/App.hs +++ b/services/brig/src/Brig/App.hs @@ -1,4 +1,6 @@ {-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE OverloadedRecordDot #-} +{-# LANGUAGE RecordWildCards #-} {-# LANGUAGE StrictData #-} {-# LANGUAGE TemplateHaskell #-} -- FUTUREWORK: Get rid of this option once Polysemy is fully introduced to Brig @@ -60,6 +62,7 @@ module Brig.App emailSender, randomPrekeyLocalLock, keyPackageLocalLock, + rabbitmqChannel, fsWatcher, -- * App Monad @@ -132,6 +135,9 @@ import Data.Yaml (FromJSON) import qualified Database.Bloodhound as ES import HTTP2.Client.Manager (Http2Manager, http2ManagerWithSSLCtx) import Imports +import qualified Network.AMQP as Q +import Network.AMQP.Extended (RabbitMqHooks (RabbitMqHooks)) +import qualified Network.AMQP.Extended as Q import Network.HTTP.Client (responseTimeoutMicro) import Network.HTTP.Client.OpenSSL import OpenSSL.EVP.Digest (Digest, getDigestByName) @@ -191,7 +197,8 @@ data Env = Env _digestMD5 :: Digest, _indexEnv :: IndexEnv, _randomPrekeyLocalLock :: Maybe (MVar ()), - _keyPackageLocalLock :: MVar () + _keyPackageLocalLock :: MVar (), + _rabbitmqChannel :: MVar Q.Channel } makeLenses ''Env @@ -244,6 +251,7 @@ newEnv o = do Log.info lgr $ Log.msg (Log.val "randomPrekeys: not active; using dynamoDB instead.") pure Nothing kpLock <- newMVar () + rabbitChan <- mkRabbitMqChannel lgr o pure $! Env { _cargohold = mkEndpoint $ Opt.cargohold o, @@ -279,7 +287,8 @@ newEnv o = do _digestSHA256 = sha256, _indexEnv = mkIndexEnv o lgr mgr mtr (Opt.galley o), _randomPrekeyLocalLock = prekeyLocalLock, - _keyPackageLocalLock = kpLock + _keyPackageLocalLock = kpLock, + _rabbitmqChannel = rabbitChan } where emailConn _ (Opt.EmailAWS aws) = pure (Just aws, Nothing) @@ -295,6 +304,17 @@ newEnv o = do pure (Nothing, Just smtp) mkEndpoint service = RPC.host (encodeUtf8 (service ^. epHost)) . RPC.port (service ^. epPort) $ RPC.empty +mkRabbitMqChannel :: Logger -> Opts -> IO (MVar Q.Channel) +mkRabbitMqChannel l (Opt.rabbitmq -> Opt.RabbitMqOpts {..}) = do + chan <- newEmptyMVar + Q.openConnectionWithRetries l host port vHost $ + RabbitMqHooks + { onNewChannel = putMVar chan, + onChannelException = \_ -> void $ tryTakeMVar chan, + onConnectionClose = void $ tryTakeMVar chan + } + pure chan + mkIndexEnv :: Opts -> Logger -> Manager -> Metrics -> Endpoint -> IndexEnv mkIndexEnv o lgr mgr mtr galleyEndpoint = let bhe = ES.mkBHEnv (ES.Server (Opt.url (Opt.elasticsearch o))) mgr diff --git a/services/brig/src/Brig/Federation/Client.hs b/services/brig/src/Brig/Federation/Client.hs index c7ee6561c1..f90740b048 100644 --- a/services/brig/src/Brig/Federation/Client.hs +++ b/services/brig/src/Brig/Federation/Client.hs @@ -1,4 +1,5 @@ {-# LANGUAGE AllowAmbiguousTypes #-} +{-# LANGUAGE NumericUnderscores #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TypeApplications #-} @@ -25,18 +26,23 @@ module Brig.Federation.Client where import Brig.App import Control.Lens import Control.Monad +import Control.Monad.Catch (MonadMask, throwM) import Control.Monad.Trans.Except (ExceptT (..), throwE) +import Control.Retry +import Control.Timeout import Data.Domain import Data.Handle import Data.Id (ClientId, UserId) import Data.Qualified import Data.Range (Range) import qualified Data.Text as T +import Data.Time.Units import Imports -import Servant.Client hiding (client) +import qualified Network.AMQP as Q import qualified System.Logger.Class as Log import Wire.API.Federation.API import Wire.API.Federation.API.Brig as FederatedBrig +import Wire.API.Federation.BackendNotifications import Wire.API.Federation.Client import Wire.API.Federation.Error import Wire.API.User @@ -137,18 +143,38 @@ sendConnectionAction self (tUntagged -> other) action = do notifyUserDeleted :: ( MonadReader Env m, MonadIO m, - HasFedEndpoint 'Brig api "on-user-deleted-connections", - HasClient (FederatorClient 'Brig) api + MonadMask m, + Log.MonadLogger m ) => Local UserId -> Remote (Range 1 1000 [UserId]) -> - ExceptT FederationError m () + m () notifyUserDeleted self remotes = do let remoteConnections = tUnqualified remotes - void $ - runBrigFederatorClient (tDomain remotes) $ - fedClient @'Brig @"on-user-deleted-connections" $ - UserDeletedConnectionsNotification (tUnqualified self) remoteConnections + let notif = UserDeletedConnectionsNotification (tUnqualified self) remoteConnections + enqueueNotification (tDomain self) (tDomain remotes) Q.Persistent $ void $ fedQueueClient @'Brig @"on-user-deleted-connections" notif + +-- | Enqueues notifications in RabbitMQ. Retries 3 times with a delay of 1s. +enqueueNotification :: (MonadReader Env m, MonadIO m, MonadMask m, Log.MonadLogger m) => Domain -> Domain -> Q.DeliveryMode -> FedQueueClient c () -> m () +enqueueNotification ownDomain remoteDomain deliveryMode action = + recovering (limitRetries 3 <> constantDelay 1_000_000) [logRetries (const $ pure True) logError] (const go) + where + logError willRetry (SomeException e) status = do + Log.err $ + Log.msg @Text "failed to enqueue notification in RabbitMQ" + . Log.field "error" (displayException e) + . Log.field "willRetry" willRetry + . Log.field "retryCount" status.rsIterNumber + go = do + mChan <- timeout (1 :: Second) (readMVar =<< view rabbitmqChannel) + case mChan of + Nothing -> throwM NoRabbitMqChannel + Just chan -> liftIO $ enqueue chan ownDomain remoteDomain deliveryMode action + +data NoRabbitMqChannel = NoRabbitMqChannel + deriving (Show) + +instance Exception NoRabbitMqChannel runBrigFederatorClient :: (MonadReader Env m, MonadIO m) => diff --git a/services/brig/src/Brig/IO/Intra.hs b/services/brig/src/Brig/IO/Intra.hs index 3c98f8ab9a..204488de25 100644 --- a/services/brig/src/Brig/IO/Intra.hs +++ b/services/brig/src/Brig/IO/Intra.hs @@ -75,8 +75,6 @@ import qualified Data.Aeson.KeyMap as KeyMap import Data.ByteString.Conversion import qualified Data.ByteString.Lazy as BL import qualified Data.Conduit.List as C -import Data.Domain -import Data.Either.Combinators (whenLeft) import Data.Id import Data.Json.Util ((#)) import Data.List.Split (chunksOf) @@ -296,7 +294,8 @@ notifyUserDeletionRemotes :: forall m. ( MonadReader Env m, MonadClient m, - MonadLogger m + MonadLogger m, + MonadMask m ) => UserId -> m () @@ -317,17 +316,7 @@ notifyUserDeletionRemotes deleted = do pure () Just rangedUids -> do luidDeleted <- qualifyLocal deleted - eitherFErr <- runExceptT (notifyUserDeleted luidDeleted (qualifyAs uids rangedUids)) - whenLeft eitherFErr $ - logFederationError (tDomain uids) - - logFederationError :: Domain -> FederationError -> m () - logFederationError domain fErr = - Log.err $ - Log.msg ("Federation error while notifying remote backends of a user deletion." :: ByteString) - . Log.field "user_id" (show deleted) - . Log.field "domain" (domainText domain) - . Log.field "error" (show fErr) + notifyUserDeleted luidDeleted (qualifyAs uids rangedUids) -- | Push events to other users. push :: diff --git a/services/brig/src/Brig/Options.hs b/services/brig/src/Brig/Options.hs index 66904d3ce0..aaf0d5c62b 100644 --- a/services/brig/src/Brig/Options.hs +++ b/services/brig/src/Brig/Options.hs @@ -95,6 +95,15 @@ data ElasticSearchOpts = ElasticSearchOpts instance FromJSON ElasticSearchOpts +data RabbitMqOpts = RabbitMqOpts + { host :: !String, + port :: !Int, + vHost :: !Text + } + deriving (Show, Generic) + +instance FromJSON RabbitMqOpts + data AWSOpts = AWSOpts { -- | Event journal queue for user events -- (e.g. user deletion) @@ -433,6 +442,8 @@ data Opts = Opts cassandra :: !CassandraOpts, -- | ElasticSearch settings elasticsearch :: !ElasticSearchOpts, + -- | RabbitMQ settings + rabbitmq :: !RabbitMqOpts, -- | AWS settings aws :: !AWSOpts, -- | Enable Random Prekey Strategy diff --git a/services/brig/test/integration/API/User/Account.hs b/services/brig/test/integration/API/User/Account.hs index 664a17d964..030196e303 100644 --- a/services/brig/test/integration/API/User/Account.hs +++ b/services/brig/test/integration/API/User/Account.hs @@ -87,9 +87,6 @@ import Wire.API.Asset hiding (Asset) import qualified Wire.API.Asset as Asset import Wire.API.Connection import Wire.API.Conversation -import Wire.API.Federation.API.Brig (UserDeletedConnectionsNotification (..)) -import qualified Wire.API.Federation.API.Brig as FedBrig -import Wire.API.Federation.API.Common (EmptyResponse (EmptyResponse)) import Wire.API.Internal.Notification import Wire.API.Routes.MultiTablePaging import Wire.API.Team.Feature (ExposeInvitationURLsToTeamAdminConfig (..), FeatureStatus (..), FeatureTTL' (..), LockStatus (LockStatusLocked), withStatus) @@ -157,8 +154,6 @@ tests _ at opts p b c ch g aws userJournalWatcher = test p "delete/by-code" $ testDeleteUserByCode b, test p "delete/anonymous" $ testDeleteAnonUser b, test p "delete with profile pic" $ testDeleteWithProfilePic b ch, - test p "delete with connected remote users" $ testDeleteWithRemotes opts b, - test p "delete with connected remote users and failed remote notifcations" $ testDeleteWithRemotesAndFailedNotifications opts b c, test p "put /i/users/:uid/sso-id" $ testUpdateSSOId b g, testGroup "temporary customer extensions" @@ -1479,96 +1474,6 @@ testDeleteWithProfilePic brig cargohold = do -- Check that the asset gets deleted downloadAsset cargohold uid (ast ^. Asset.assetKey) !!! const 404 === statusCode -testDeleteWithRemotes :: Opt.Opts -> Brig -> Http () -testDeleteWithRemotes opts brig = do - localUser <- randomUser brig - - let remote1Domain = Domain "remote1.example.com" - remote2Domain = Domain "remote2.example.com" - remote1UserConnected <- Qualified <$> randomId <*> pure remote1Domain - remote1UserPending <- Qualified <$> randomId <*> pure remote1Domain - remote2UserBlocked <- Qualified <$> randomId <*> pure remote2Domain - - sendConnectionAction brig opts (userId localUser) remote1UserConnected (Just FedBrig.RemoteConnect) Accepted - sendConnectionAction brig opts (userId localUser) remote1UserPending Nothing Sent - sendConnectionAction brig opts (userId localUser) remote2UserBlocked (Just FedBrig.RemoteConnect) Accepted - void $ putConnectionQualified brig (userId localUser) remote2UserBlocked Blocked - - let fedMockResponse _ = pure (Aeson.encode EmptyResponse) - let galleyHandler :: ReceivedRequest -> MockT IO Wai.Response - galleyHandler (ReceivedRequest requestMethod requestPath _requestBody) = - case (requestMethod, requestPath) of - (_methodDelete, ["i", "user"]) -> do - let response = Wai.responseLBS Http.status200 [(Http.hContentType, "application/json")] (cs $ Aeson.encode EmptyResponse) - pure response - _ -> error "not mocked" - - (_, rpcCalls, _galleyCalls) <- liftIO $ - withMockedFederatorAndGalley opts (Domain "example.com") fedMockResponse galleyHandler $ do - deleteUser (userId localUser) (Just defPassword) brig !!! do - const 200 === statusCode - - liftIO $ do - remote1Call <- assertOne $ filter (\c -> frTargetDomain c == remote1Domain) rpcCalls - remote1Udn <- assertRight $ parseFedRequest remote1Call - udcnUser remote1Udn @?= userId localUser - sort (fromRange (udcnConnections remote1Udn)) - @?= sort (map qUnqualified [remote1UserConnected, remote1UserPending]) - - remote2Call <- assertOne $ filter (\c -> frTargetDomain c == remote2Domain) rpcCalls - remote2Udn <- assertRight $ parseFedRequest remote2Call - udcnUser remote2Udn @?= userId localUser - fromRange (udcnConnections remote2Udn) @?= [qUnqualified remote2UserBlocked] - where - parseFedRequest :: FromJSON a => FederatedRequest -> Either String a - parseFedRequest = eitherDecode . frBody - -testDeleteWithRemotesAndFailedNotifications :: Opt.Opts -> Brig -> Cannon -> Http () -testDeleteWithRemotesAndFailedNotifications opts brig cannon = do - alice <- randomUser brig - alex <- randomUser brig - let localDomain = qDomain (userQualifiedId alice) - - let bDomain = Domain "b.example.com" - cDomain = Domain "c.example.com" - bob <- Qualified <$> randomId <*> pure bDomain - carl <- Qualified <$> randomId <*> pure cDomain - - postConnection brig (userId alice) (userId alex) !!! const 201 === statusCode - putConnection brig (userId alex) (userId alice) Accepted !!! const 200 === statusCode - sendConnectionAction brig opts (userId alice) bob (Just FedBrig.RemoteConnect) Accepted - sendConnectionAction brig opts (userId alice) carl (Just FedBrig.RemoteConnect) Accepted - - let fedMockResponse req = - if frTargetDomain req == bDomain - then throw $ MockErrorResponse Http.status500 "mocked connection problem with b domain" - else pure (Aeson.encode EmptyResponse) - - let galleyHandler :: ReceivedRequest -> MockT IO Wai.Response - galleyHandler (ReceivedRequest requestMethod requestPath _requestBody) = - case (Http.parseMethod requestMethod, requestPath) of - (Right Http.DELETE, ["i", "user"]) -> do - let response = Wai.responseLBS Http.status200 [(Http.hContentType, "application/json")] (cs $ Aeson.encode EmptyResponse) - pure response - _ -> error "not mocked" - - (_, rpcCalls, _galleyCalls) <- WS.bracketR cannon (userId alex) $ \wsAlex -> do - let action = withMockedFederatorAndGalley opts localDomain fedMockResponse galleyHandler $ do - deleteUser (userId alice) (Just defPassword) brig !!! do - const 200 === statusCode - liftIO action <* do - void . liftIO . WS.assertMatch (5 # Second) wsAlex $ matchDeleteUserNotification (userQualifiedId alice) - - liftIO $ do - rRpc <- assertOne $ filter (\c -> frTargetDomain c == cDomain) rpcCalls - cUdn <- assertRight $ parseFedRequest rRpc - udcnUser cUdn @?= userId alice - sort (fromRange (udcnConnections cUdn)) - @?= sort (map qUnqualified [carl]) - where - parseFedRequest :: FromJSON a => FederatedRequest -> Either String a - parseFedRequest = eitherDecode . frBody - testUpdateSSOId :: Brig -> Galley -> Http () testUpdateSSOId brig galley = do noSuchUserId <- Id <$> liftIO UUID.nextRandom diff --git a/services/run-services b/services/run-services index 47d1c555bc..d4957298cb 100755 --- a/services/run-services +++ b/services/run-services @@ -386,6 +386,7 @@ FEDERATOR = Service("federator", Colors.BLUE, check_status=False).with_level(LEVEL) STERN = Service("stern", Colors.YELLOW).with_level(LEVEL) PROXY = Service("proxy", Colors.RED).with_level(LEVEL) +BACKGROUND_WORKER = Service("background-worker", Colors.RED, check_status=False).with_level(LEVEL) NGINZ = Nginz(Colors.PURPLEISH) if __name__ == '__main__': @@ -399,8 +400,8 @@ if __name__ == '__main__': 'AWS_REGION': "eu-west-1", 'AWS_ACCESS_KEY_ID': "dummykey", 'AWS_SECRET_ACCESS_KEY': "dummysecret", - 'RABBITMQ_USERNAME': 'guest', - 'RABBITMQ_PASSWORD': 'alpaca-grapefruit' + 'RABBITMQ_USERNAME': os.environ.get("RABBITMQ_USERNAME"), + 'RABBITMQ_PASSWORD': os.environ.get("RABBITMQ_PASSWORD") } backend_a = [ @@ -414,6 +415,7 @@ if __name__ == '__main__': Instance(STERN, 8091), DummyInstance(PROXY, 8087), FederatorInstance(8097, 8098), + Instance(BACKGROUND_WORKER, 0), NginzInstance( local_port=8080, http2_port=8090, @@ -431,6 +433,7 @@ if __name__ == '__main__': Instance(SPAR, 9088), DummyInstance(PROXY, 9087), FederatorInstance(9097, 9098), + Instance(BACKGROUND_WORKER, 0), NginzInstance( local_port=9080, http2_port=9090,