Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/6-federation/close-grpc-client
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Close GRPC client after making a request to a federator.
17 changes: 11 additions & 6 deletions libs/wire-api-federation/src/Wire/API/Federation/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,21 @@

module Wire.API.Federation.Client where

import Control.Monad.Catch
import Control.Monad.Except (ExceptT, MonadError (..), withExceptT)
import Control.Monad.State (MonadState (..), StateT, evalStateT, gets)
import Data.ByteString.Builder (toLazyByteString)
import qualified Data.ByteString.Lazy as LBS
import Data.Domain (Domain, domainText)
import qualified Data.Text as T
import Imports
import Mu.GRpc.Client.TyApps (GRpcMessageProtocol (MsgProtoBuf), GRpcReply (..), GrpcClient, gRpcCall, grpcClientConfigSimple)
import Mu.GRpc.Client.TyApps
import qualified Network.HTTP.Types as HTTP
import Servant.Client (ResponseF (..))
import qualified Servant.Client as Servant
import Servant.Client.Core (RequestBody (..), RequestF (..), RunClient (..))
import Util.Options (Endpoint (..))
import Wire.API.Federation.GRPC.Client (createGrpcClient, reason)
import Wire.API.Federation.GRPC.Client
import qualified Wire.API.Federation.GRPC.Types as Proto

-- FUTUREWORK: Remove originDomain from here and make it part of all the API
Expand All @@ -50,7 +51,10 @@ newtype FederatorClient (component :: Proto.Component) m a = FederatorClient {ru
deriving newtype (Functor, Applicative, Monad, MonadReader FederatorClientEnv, MonadState (Maybe ByteString), MonadIO)

runFederatorClientWith :: Monad m => GrpcClient -> Domain -> Domain -> FederatorClient component m a -> m a
runFederatorClientWith client targetDomain originDomain = flip evalStateT Nothing . flip runReaderT (FederatorClientEnv client targetDomain originDomain) . runFederatorClient
runFederatorClientWith client targetDomain originDomain =
flip evalStateT Nothing
. flip runReaderT (FederatorClientEnv client targetDomain originDomain)
. runFederatorClient

class KnownComponent (c :: Proto.Component) where
componentVal :: Proto.Component
Expand Down Expand Up @@ -167,11 +171,12 @@ mkFederatorClient = do
>>= either (throwError . FederationUnavailable . reason) pure

executeFederated ::
(MonadIO m, HasFederatorConfig m) =>
(MonadIO m, MonadMask m, HasFederatorConfig m) =>
Domain ->
FederatorClient component (ExceptT FederationClientFailure m) a ->
ExceptT FederationError m a
executeFederated targetDomain action = do
federatorClient <- mkFederatorClient
originDomain <- lift federationDomain
withExceptT FederationCallFailure (runFederatorClientWith federatorClient targetDomain originDomain action)
bracket mkFederatorClient closeGrpcClient $ \federatorClient ->
withExceptT FederationCallFailure $
runFederatorClientWith federatorClient targetDomain originDomain action
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
module Wire.API.Federation.GRPC.Client
( GrpcClientErr (..),
createGrpcClient,
closeGrpcClient,
grpcClientError,
)
where

import Control.Exception
import Control.Monad.Except
import qualified Data.Text as T
import Imports
import Mu.GRpc.Client.Record (setupGrpcClient')
Expand All @@ -41,6 +43,11 @@ createGrpcClient cfg = do
Right (Left err) -> Left (grpcClientError (Just cfg) err)
Right (Right client) -> Right client

-- | Close federator client and ignore errors, since the only possible error
-- here is EarlyEndOfStream, which should not concern us at this point.
closeGrpcClient :: MonadIO m => GrpcClient -> m ()
closeGrpcClient = void . liftIO . runExceptT . close

grpcClientError :: Exception e => Maybe GrpcClientConfig -> e -> GrpcClientErr
grpcClientError mcfg err =
GrpcClientErr . T.pack $
Expand Down
17 changes: 1 addition & 16 deletions services/federator/src/Federator/Remote.hs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import Mu.GRpc.Client.Optics (GRpcReply)
import Mu.GRpc.Client.Record (GRpcMessageProtocol (MsgProtoBuf))
import Mu.GRpc.Client.TyApps (gRpcCall)
import Network.GRPC.Client.Helpers
import Network.HTTP2.Client.Exceptions
import Network.TLS as TLS
import qualified Network.TLS.Extra.Cipher as TLS
import Polysemy
Expand Down Expand Up @@ -86,7 +85,7 @@ interpretRemote = interpret $ \case
target <-
Polysemy.mapError (RemoteErrorDiscoveryFailure vDomain) $
discoverFederatorWithError vDomain
Polysemy.bracket (mkGrpcClient target) (closeGrpcClient target) $ \client ->
Polysemy.bracket (mkGrpcClient target) (embed @IO . closeGrpcClient) $ \client ->
callInward client vRequest

callInward :: MonadIO m => GrpcClient -> Request -> m (GRpcReply InwardResponse)
Expand Down Expand Up @@ -158,20 +157,6 @@ mkGrpcClient target@(SrvTarget host port) = do
. Polysemy.fromEither
=<< Polysemy.fromExceptionVia (RemoteErrorTLSException target) (createGrpcClient cfg')

closeGrpcClient ::
Members '[Embed IO, Polysemy.Error RemoteError] r =>
SrvTarget ->
GrpcClient ->
Sem r ()
closeGrpcClient target =
Polysemy.mapError handle
. Polysemy.fromEitherM
. runExceptT
. close
where
handle :: ClientError -> RemoteError
handle = RemoteErrorClientFailure target . grpcClientError Nothing

logRemoteErrors ::
Members '[Polysemy.Error RemoteError, TinyLog] r =>
Sem r x ->
Expand Down
39 changes: 25 additions & 14 deletions services/federator/test/integration/Test/Federator/IngressSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ module Test.Federator.IngressSpec where

import Bilge
import Control.Lens (view, (^.))
import Control.Monad.Catch
import Data.Aeson
import qualified Data.ByteString.Lazy as LBS
import Data.Default (def)
Expand All @@ -43,7 +44,7 @@ import Test.Federator.Util
import Test.Hspec
import Test.Tasty.HUnit (assertFailure)
import Util.Options (Endpoint (Endpoint))
import Wire.API.Federation.GRPC.Client (createGrpcClient)
import Wire.API.Federation.GRPC.Client (closeGrpcClient, createGrpcClient)
import Wire.API.Federation.GRPC.Types hiding (body, path)
import qualified Wire.API.Federation.GRPC.Types as GRPC
import Wire.API.User
Expand Down Expand Up @@ -98,24 +99,34 @@ spec env =
GRpcErrorString err -> err `shouldBe` "GRPC status indicates failure: status-code=INTERNAL, status-message=\"HTTP Status 400\""
_ -> assertFailure $ "Expect HTTP 400, got: " <> show grpcReply

inwardBrigCallViaIngress :: (MonadIO m, MonadHttp m, MonadReader TestEnv m, HasCallStack) => ByteString -> LBS.ByteString -> m (GRpcReply InwardResponse)
inwardBrigCallViaIngress ::
( MonadIO m,
MonadMask m,
MonadHttp m,
MonadReader TestEnv m,
HasCallStack
) =>
ByteString ->
LBS.ByteString ->
m (GRpcReply InwardResponse)
inwardBrigCallViaIngress requestPath payload = do
Endpoint ingressHost ingressPort <- cfgNginxIngress . view teTstOpts <$> ask
let target = SrvTarget (cs ingressHost) ingressPort
runSettings <- optSettings . view teOpts <$> ask
tlsSettings <- view teTLSSettings
c <-
liftIO
. Polysemy.runM
. Polysemy.runError @RemoteError
. discardLogs
. Polysemy.runInputConst tlsSettings
. Polysemy.runReader runSettings
$ mkGrpcClient target
client <- case c of
Left clientErr -> liftIO $ assertFailure (show clientErr)
Right cli -> pure cli
inwardBrigCallViaIngressWithClient client requestPath payload
bracket
( liftIO
. Polysemy.runM
. Polysemy.runError @RemoteError
. discardLogs
. Polysemy.runInputConst tlsSettings
. Polysemy.runReader runSettings
$ mkGrpcClient target
)
(either (const (pure ())) closeGrpcClient)
$ \case
Left clientErr -> liftIO $ assertFailure (show clientErr)
Right client -> inwardBrigCallViaIngressWithClient client requestPath payload

inwardBrigCallViaIngressWithClient :: (MonadIO m, MonadHttp m, MonadReader TestEnv m, HasCallStack) => GrpcClient -> ByteString -> LBS.ByteString -> m (GRpcReply InwardResponse)
inwardBrigCallViaIngressWithClient client requestPath payload = do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ newtype TestFederator m a = TestFederator {unwrapTestFederator :: ReaderT TestEn
MonadReader TestEnv,
MonadFail,
MonadThrow,
MonadCatch
MonadCatch,
MonadMask
)

instance MonadRandom m => MonadRandom (TestFederator m) where
Expand Down
47 changes: 23 additions & 24 deletions services/federator/test/unit/Test/Federator/Remote.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
module Test.Federator.Remote where

import Data.Streaming.Network (bindRandomPortTCP)
import Federator.Env (TLSSettings)
import Federator.Options
import Federator.Remote
import Federator.Run (mkTLSSettingsOrThrow)
Expand All @@ -31,11 +32,13 @@ import qualified Network.Wai.Handler.WarpTLS as WarpTLS
import Polysemy
import qualified Polysemy.Error as Polysemy
import qualified Polysemy.Input as Polysemy
import qualified Polysemy.Resource as Polysemy
import Test.Federator.Options (defRunSettings)
import Test.Tasty
import Test.Tasty.HUnit
import UnliftIO (bracket, timeout)
import qualified UnliftIO.Async as Async
import Wire.API.Federation.GRPC.Client
import Wire.Network.DNS.SRV (SrvTarget (SrvTarget))

tests :: TestTree
Expand All @@ -59,37 +62,41 @@ settings =
remoteCAStore = Just "test/resources/unit/unit-ca.pem"
}

assertNoError ::
forall e r x.
(Show e, Member (Embed IO) r) =>
Sem (Polysemy.Error e ': r) x ->
Sem r x
assertNoError :: IO (Either RemoteError x) -> IO x
assertNoError action =
Polysemy.runError action >>= \case
Left err -> embed . assertFailure $ "Unexpected error: " <> show err
action >>= \case
Left err -> assertFailure $ "Unexpected error: " <> show err
Right x -> pure x

mkTestGrpcClient :: TLSSettings -> Int -> IO (Either RemoteError ())
mkTestGrpcClient tlsSettings port =
Polysemy.runM
. Polysemy.runResource
. Polysemy.runError
. Polysemy.runInputConst tlsSettings
$ do
Polysemy.bracket
(mkGrpcClient (SrvTarget "localhost" (fromIntegral port)))
(embed @IO . closeGrpcClient)
(const (pure ()))

testValidatesCertificateSuccess :: TestTree
testValidatesCertificateSuccess =
testGroup
"can get response with valid certificate"
[ testCase "when hostname=localhost and certificate-for=localhost" $ do
bracket (startMockServer certForLocalhost) (\(serverThread, _) -> Async.cancel serverThread) $ \(_, port) -> do
tlsSettings <- mkTLSSettingsOrThrow settings
void . Polysemy.runM . assertNoError @RemoteError . Polysemy.runInputConst tlsSettings $ mkGrpcClient (SrvTarget "localhost" (fromIntegral port)),
assertNoError (mkTestGrpcClient tlsSettings port),
testCase "when hostname=localhost. and certificate-for=localhost" $ do
bracket (startMockServer certForLocalhost) (\(serverThread, _) -> Async.cancel serverThread) $ \(_, port) -> do
tlsSettings <- mkTLSSettingsOrThrow settings
void . Polysemy.runM . assertNoError @RemoteError . Polysemy.runInputConst tlsSettings $ mkGrpcClient (SrvTarget "localhost." (fromIntegral port)),
assertNoError (mkTestGrpcClient tlsSettings port),
-- This is a limitation of the TLS library, this test just exists to document that.
testCase "when hostname=localhost. and certificate-for=localhost." $ do
bracket (startMockServer certForLocalhostDot) (\(serverThread, _) -> Async.cancel serverThread) $ \(_, port) -> do
tlsSettings <- mkTLSSettingsOrThrow settings
eitherClient <-
Polysemy.runM
. Polysemy.runError @RemoteError
. Polysemy.runInputConst tlsSettings
$ mkGrpcClient (SrvTarget "localhost." (fromIntegral port))
eitherClient <- mkTestGrpcClient tlsSettings port
case eitherClient of
Left _ -> pure ()
Right _ -> assertFailure "Congratulations, you fixed a known issue!"
Expand All @@ -102,23 +109,15 @@ testValidatesCertificateWrongHostname =
[ testCase "when the server's certificate doesn't match the hostname" $
bracket (startMockServer certForWrongDomain) (Async.cancel . fst) $ \(_, port) -> do
tlsSettings <- mkTLSSettingsOrThrow settings
eitherClient <-
Polysemy.runM
. Polysemy.runError
. Polysemy.runInputConst tlsSettings
$ mkGrpcClient (SrvTarget "localhost." (fromIntegral port))
eitherClient <- mkTestGrpcClient tlsSettings port
case eitherClient of
Left (RemoteErrorTLSException _ _) -> pure ()
Left x -> assertFailure $ "Expected TLS failure, got: " <> show x
Right _ -> assertFailure "Expected connection with the server to fail",
testCase "when the server's certificate does not have the server key usage flag" $
bracket (startMockServer certWithoutServerKeyUsage) (Async.cancel . fst) $ \(_, port) -> do
tlsSettings <- mkTLSSettingsOrThrow settings
eitherClient <-
Polysemy.runM
. Polysemy.runError
. Polysemy.runInputConst tlsSettings
$ mkGrpcClient (SrvTarget "localhost." (fromIntegral port))
eitherClient <- mkTestGrpcClient tlsSettings port
case eitherClient of
Left (RemoteErrorTLSException _ _) -> pure ()
Left x -> assertFailure $ "Expected TLS failure, got: " <> show x
Expand Down
5 changes: 2 additions & 3 deletions services/galley/src/Galley/API/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ where

import Control.Exception.Safe (catchAny)
import Control.Lens hiding ((.=))
import Control.Monad.Except (runExceptT)
import Data.Data (Proxy (Proxy))
import Data.Id as Id
import Data.List1 (maybeList1)
Expand Down Expand Up @@ -57,6 +56,7 @@ import qualified Galley.Data.Conversation as Data
import Galley.Effects
import Galley.Effects.ClientStore
import Galley.Effects.ConversationStore
import Galley.Effects.FederatorAccess
import Galley.Effects.GundeckAccess
import Galley.Effects.MemberStore
import Galley.Effects.Paging
Expand Down Expand Up @@ -90,7 +90,6 @@ import Wire.API.Conversation (ConvIdsPage, pattern GetPaginatedConversationIds)
import Wire.API.ErrorDescription (MissingLegalholdConsent)
import Wire.API.Federation.API.Galley (UserDeletedConversationsNotification (UserDeletedConversationsNotification))
import qualified Wire.API.Federation.API.Galley as FedGalley
import Wire.API.Federation.Client (executeFederated)
import Wire.API.Routes.MultiTablePaging (mtpHasMore, mtpPagingState, mtpResults)
import Wire.API.Routes.MultiVerb (MultiVerb, RespondEmpty)
import Wire.API.Routes.Public (ZOptConn, ZUser)
Expand Down Expand Up @@ -564,7 +563,7 @@ rmUser user conn = do
for_ (bucketRemote (fromRange cids)) $ \remoteConvs -> do
let userDelete = UserDeletedConversationsNotification (tUnqualified lusr) (unsafeRange (tUnqualified remoteConvs))
let rpc = FedGalley.onUserDeleted FedGalley.clientRoutes (tDomain lusr) userDelete
res <- runExceptT (executeFederated (tDomain remoteConvs) rpc)
res <- liftSem $ runFederatedEither remoteConvs rpc
case res of
-- FUTUREWORK: Add a retry mechanism if there are federation errrors.
-- See https://wearezeta.atlassian.net/browse/SQCORE-1091
Expand Down
6 changes: 5 additions & 1 deletion services/galley/src/Galley/Intra/Federator/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ module Galley.Intra.Federator.Types
where

import Control.Lens
import Control.Monad.Catch
import Control.Monad.Except
import Galley.Env
import Galley.Options
Expand All @@ -43,7 +44,10 @@ newtype FederationM a = FederationM
Monad,
MonadIO,
MonadReader Env,
MonadUnliftIO
MonadUnliftIO,
MonadThrow,
MonadCatch,
MonadMask
)

runFederationM :: Env -> FederationM a -> IO a
Expand Down