diff --git a/changelog.d/6-federation/close-grpc-client b/changelog.d/6-federation/close-grpc-client new file mode 100644 index 0000000000..274b7be3c6 --- /dev/null +++ b/changelog.d/6-federation/close-grpc-client @@ -0,0 +1 @@ +Close GRPC client after making a request to a federator. diff --git a/libs/wire-api-federation/src/Wire/API/Federation/Client.hs b/libs/wire-api-federation/src/Wire/API/Federation/Client.hs index 423c7788a5..608fac4d38 100644 --- a/libs/wire-api-federation/src/Wire/API/Federation/Client.hs +++ b/libs/wire-api-federation/src/Wire/API/Federation/Client.hs @@ -20,6 +20,7 @@ module Wire.API.Federation.Client where +import Control.Monad.Catch import Control.Monad.Except (ExceptT, MonadError (..), withExceptT) import Control.Monad.State (MonadState (..), StateT, evalStateT, gets) import Data.ByteString.Builder (toLazyByteString) @@ -27,13 +28,13 @@ import qualified Data.ByteString.Lazy as LBS import Data.Domain (Domain, domainText) import qualified Data.Text as T import Imports -import Mu.GRpc.Client.TyApps (GRpcMessageProtocol (MsgProtoBuf), GRpcReply (..), GrpcClient, gRpcCall, grpcClientConfigSimple) +import Mu.GRpc.Client.TyApps import qualified Network.HTTP.Types as HTTP import Servant.Client (ResponseF (..)) import qualified Servant.Client as Servant import Servant.Client.Core (RequestBody (..), RequestF (..), RunClient (..)) import Util.Options (Endpoint (..)) -import Wire.API.Federation.GRPC.Client (createGrpcClient, reason) +import Wire.API.Federation.GRPC.Client import qualified Wire.API.Federation.GRPC.Types as Proto -- FUTUREWORK: Remove originDomain from here and make it part of all the API @@ -50,7 +51,10 @@ newtype FederatorClient (component :: Proto.Component) m a = FederatorClient {ru deriving newtype (Functor, Applicative, Monad, MonadReader FederatorClientEnv, MonadState (Maybe ByteString), MonadIO) runFederatorClientWith :: Monad m => GrpcClient -> Domain -> Domain -> FederatorClient component m a -> m a -runFederatorClientWith client targetDomain originDomain = flip evalStateT Nothing . flip runReaderT (FederatorClientEnv client targetDomain originDomain) . runFederatorClient +runFederatorClientWith client targetDomain originDomain = + flip evalStateT Nothing + . flip runReaderT (FederatorClientEnv client targetDomain originDomain) + . runFederatorClient class KnownComponent (c :: Proto.Component) where componentVal :: Proto.Component @@ -167,11 +171,12 @@ mkFederatorClient = do >>= either (throwError . FederationUnavailable . reason) pure executeFederated :: - (MonadIO m, HasFederatorConfig m) => + (MonadIO m, MonadMask m, HasFederatorConfig m) => Domain -> FederatorClient component (ExceptT FederationClientFailure m) a -> ExceptT FederationError m a executeFederated targetDomain action = do - federatorClient <- mkFederatorClient originDomain <- lift federationDomain - withExceptT FederationCallFailure (runFederatorClientWith federatorClient targetDomain originDomain action) + bracket mkFederatorClient closeGrpcClient $ \federatorClient -> + withExceptT FederationCallFailure $ + runFederatorClientWith federatorClient targetDomain originDomain action diff --git a/libs/wire-api-federation/src/Wire/API/Federation/GRPC/Client.hs b/libs/wire-api-federation/src/Wire/API/Federation/GRPC/Client.hs index ec6a2aa44c..5e745e8524 100644 --- a/libs/wire-api-federation/src/Wire/API/Federation/GRPC/Client.hs +++ b/libs/wire-api-federation/src/Wire/API/Federation/GRPC/Client.hs @@ -18,11 +18,13 @@ module Wire.API.Federation.GRPC.Client ( GrpcClientErr (..), createGrpcClient, + closeGrpcClient, grpcClientError, ) where import Control.Exception +import Control.Monad.Except import qualified Data.Text as T import Imports import Mu.GRpc.Client.Record (setupGrpcClient') @@ -41,6 +43,11 @@ createGrpcClient cfg = do Right (Left err) -> Left (grpcClientError (Just cfg) err) Right (Right client) -> Right client +-- | Close federator client and ignore errors, since the only possible error +-- here is EarlyEndOfStream, which should not concern us at this point. +closeGrpcClient :: MonadIO m => GrpcClient -> m () +closeGrpcClient = void . liftIO . runExceptT . close + grpcClientError :: Exception e => Maybe GrpcClientConfig -> e -> GrpcClientErr grpcClientError mcfg err = GrpcClientErr . T.pack $ diff --git a/services/federator/src/Federator/Remote.hs b/services/federator/src/Federator/Remote.hs index 1fe137d803..5324529818 100644 --- a/services/federator/src/Federator/Remote.hs +++ b/services/federator/src/Federator/Remote.hs @@ -43,7 +43,6 @@ import Mu.GRpc.Client.Optics (GRpcReply) import Mu.GRpc.Client.Record (GRpcMessageProtocol (MsgProtoBuf)) import Mu.GRpc.Client.TyApps (gRpcCall) import Network.GRPC.Client.Helpers -import Network.HTTP2.Client.Exceptions import Network.TLS as TLS import qualified Network.TLS.Extra.Cipher as TLS import Polysemy @@ -86,7 +85,7 @@ interpretRemote = interpret $ \case target <- Polysemy.mapError (RemoteErrorDiscoveryFailure vDomain) $ discoverFederatorWithError vDomain - Polysemy.bracket (mkGrpcClient target) (closeGrpcClient target) $ \client -> + Polysemy.bracket (mkGrpcClient target) (embed @IO . closeGrpcClient) $ \client -> callInward client vRequest callInward :: MonadIO m => GrpcClient -> Request -> m (GRpcReply InwardResponse) @@ -158,20 +157,6 @@ mkGrpcClient target@(SrvTarget host port) = do . Polysemy.fromEither =<< Polysemy.fromExceptionVia (RemoteErrorTLSException target) (createGrpcClient cfg') -closeGrpcClient :: - Members '[Embed IO, Polysemy.Error RemoteError] r => - SrvTarget -> - GrpcClient -> - Sem r () -closeGrpcClient target = - Polysemy.mapError handle - . Polysemy.fromEitherM - . runExceptT - . close - where - handle :: ClientError -> RemoteError - handle = RemoteErrorClientFailure target . grpcClientError Nothing - logRemoteErrors :: Members '[Polysemy.Error RemoteError, TinyLog] r => Sem r x -> diff --git a/services/federator/test/integration/Test/Federator/IngressSpec.hs b/services/federator/test/integration/Test/Federator/IngressSpec.hs index 8d87ff53fa..2820598747 100644 --- a/services/federator/test/integration/Test/Federator/IngressSpec.hs +++ b/services/federator/test/integration/Test/Federator/IngressSpec.hs @@ -19,6 +19,7 @@ module Test.Federator.IngressSpec where import Bilge import Control.Lens (view, (^.)) +import Control.Monad.Catch import Data.Aeson import qualified Data.ByteString.Lazy as LBS import Data.Default (def) @@ -43,7 +44,7 @@ import Test.Federator.Util import Test.Hspec import Test.Tasty.HUnit (assertFailure) import Util.Options (Endpoint (Endpoint)) -import Wire.API.Federation.GRPC.Client (createGrpcClient) +import Wire.API.Federation.GRPC.Client (closeGrpcClient, createGrpcClient) import Wire.API.Federation.GRPC.Types hiding (body, path) import qualified Wire.API.Federation.GRPC.Types as GRPC import Wire.API.User @@ -98,24 +99,34 @@ spec env = GRpcErrorString err -> err `shouldBe` "GRPC status indicates failure: status-code=INTERNAL, status-message=\"HTTP Status 400\"" _ -> assertFailure $ "Expect HTTP 400, got: " <> show grpcReply -inwardBrigCallViaIngress :: (MonadIO m, MonadHttp m, MonadReader TestEnv m, HasCallStack) => ByteString -> LBS.ByteString -> m (GRpcReply InwardResponse) +inwardBrigCallViaIngress :: + ( MonadIO m, + MonadMask m, + MonadHttp m, + MonadReader TestEnv m, + HasCallStack + ) => + ByteString -> + LBS.ByteString -> + m (GRpcReply InwardResponse) inwardBrigCallViaIngress requestPath payload = do Endpoint ingressHost ingressPort <- cfgNginxIngress . view teTstOpts <$> ask let target = SrvTarget (cs ingressHost) ingressPort runSettings <- optSettings . view teOpts <$> ask tlsSettings <- view teTLSSettings - c <- - liftIO - . Polysemy.runM - . Polysemy.runError @RemoteError - . discardLogs - . Polysemy.runInputConst tlsSettings - . Polysemy.runReader runSettings - $ mkGrpcClient target - client <- case c of - Left clientErr -> liftIO $ assertFailure (show clientErr) - Right cli -> pure cli - inwardBrigCallViaIngressWithClient client requestPath payload + bracket + ( liftIO + . Polysemy.runM + . Polysemy.runError @RemoteError + . discardLogs + . Polysemy.runInputConst tlsSettings + . Polysemy.runReader runSettings + $ mkGrpcClient target + ) + (either (const (pure ())) closeGrpcClient) + $ \case + Left clientErr -> liftIO $ assertFailure (show clientErr) + Right client -> inwardBrigCallViaIngressWithClient client requestPath payload inwardBrigCallViaIngressWithClient :: (MonadIO m, MonadHttp m, MonadReader TestEnv m, HasCallStack) => GrpcClient -> ByteString -> LBS.ByteString -> m (GRpcReply InwardResponse) inwardBrigCallViaIngressWithClient client requestPath payload = do diff --git a/services/federator/test/integration/Test/Federator/Util.hs b/services/federator/test/integration/Test/Federator/Util.hs index bf7835bcda..e80da9dd1e 100644 --- a/services/federator/test/integration/Test/Federator/Util.hs +++ b/services/federator/test/integration/Test/Federator/Util.hs @@ -69,7 +69,8 @@ newtype TestFederator m a = TestFederator {unwrapTestFederator :: ReaderT TestEn MonadReader TestEnv, MonadFail, MonadThrow, - MonadCatch + MonadCatch, + MonadMask ) instance MonadRandom m => MonadRandom (TestFederator m) where diff --git a/services/federator/test/unit/Test/Federator/Remote.hs b/services/federator/test/unit/Test/Federator/Remote.hs index 3535f859d1..61d105ed7f 100644 --- a/services/federator/test/unit/Test/Federator/Remote.hs +++ b/services/federator/test/unit/Test/Federator/Remote.hs @@ -20,6 +20,7 @@ module Test.Federator.Remote where import Data.Streaming.Network (bindRandomPortTCP) +import Federator.Env (TLSSettings) import Federator.Options import Federator.Remote import Federator.Run (mkTLSSettingsOrThrow) @@ -31,11 +32,13 @@ import qualified Network.Wai.Handler.WarpTLS as WarpTLS import Polysemy import qualified Polysemy.Error as Polysemy import qualified Polysemy.Input as Polysemy +import qualified Polysemy.Resource as Polysemy import Test.Federator.Options (defRunSettings) import Test.Tasty import Test.Tasty.HUnit import UnliftIO (bracket, timeout) import qualified UnliftIO.Async as Async +import Wire.API.Federation.GRPC.Client import Wire.Network.DNS.SRV (SrvTarget (SrvTarget)) tests :: TestTree @@ -59,16 +62,24 @@ settings = remoteCAStore = Just "test/resources/unit/unit-ca.pem" } -assertNoError :: - forall e r x. - (Show e, Member (Embed IO) r) => - Sem (Polysemy.Error e ': r) x -> - Sem r x +assertNoError :: IO (Either RemoteError x) -> IO x assertNoError action = - Polysemy.runError action >>= \case - Left err -> embed . assertFailure $ "Unexpected error: " <> show err + action >>= \case + Left err -> assertFailure $ "Unexpected error: " <> show err Right x -> pure x +mkTestGrpcClient :: TLSSettings -> Int -> IO (Either RemoteError ()) +mkTestGrpcClient tlsSettings port = + Polysemy.runM + . Polysemy.runResource + . Polysemy.runError + . Polysemy.runInputConst tlsSettings + $ do + Polysemy.bracket + (mkGrpcClient (SrvTarget "localhost" (fromIntegral port))) + (embed @IO . closeGrpcClient) + (const (pure ())) + testValidatesCertificateSuccess :: TestTree testValidatesCertificateSuccess = testGroup @@ -76,20 +87,16 @@ testValidatesCertificateSuccess = [ testCase "when hostname=localhost and certificate-for=localhost" $ do bracket (startMockServer certForLocalhost) (\(serverThread, _) -> Async.cancel serverThread) $ \(_, port) -> do tlsSettings <- mkTLSSettingsOrThrow settings - void . Polysemy.runM . assertNoError @RemoteError . Polysemy.runInputConst tlsSettings $ mkGrpcClient (SrvTarget "localhost" (fromIntegral port)), + assertNoError (mkTestGrpcClient tlsSettings port), testCase "when hostname=localhost. and certificate-for=localhost" $ do bracket (startMockServer certForLocalhost) (\(serverThread, _) -> Async.cancel serverThread) $ \(_, port) -> do tlsSettings <- mkTLSSettingsOrThrow settings - void . Polysemy.runM . assertNoError @RemoteError . Polysemy.runInputConst tlsSettings $ mkGrpcClient (SrvTarget "localhost." (fromIntegral port)), + assertNoError (mkTestGrpcClient tlsSettings port), -- This is a limitation of the TLS library, this test just exists to document that. testCase "when hostname=localhost. and certificate-for=localhost." $ do bracket (startMockServer certForLocalhostDot) (\(serverThread, _) -> Async.cancel serverThread) $ \(_, port) -> do tlsSettings <- mkTLSSettingsOrThrow settings - eitherClient <- - Polysemy.runM - . Polysemy.runError @RemoteError - . Polysemy.runInputConst tlsSettings - $ mkGrpcClient (SrvTarget "localhost." (fromIntegral port)) + eitherClient <- mkTestGrpcClient tlsSettings port case eitherClient of Left _ -> pure () Right _ -> assertFailure "Congratulations, you fixed a known issue!" @@ -102,11 +109,7 @@ testValidatesCertificateWrongHostname = [ testCase "when the server's certificate doesn't match the hostname" $ bracket (startMockServer certForWrongDomain) (Async.cancel . fst) $ \(_, port) -> do tlsSettings <- mkTLSSettingsOrThrow settings - eitherClient <- - Polysemy.runM - . Polysemy.runError - . Polysemy.runInputConst tlsSettings - $ mkGrpcClient (SrvTarget "localhost." (fromIntegral port)) + eitherClient <- mkTestGrpcClient tlsSettings port case eitherClient of Left (RemoteErrorTLSException _ _) -> pure () Left x -> assertFailure $ "Expected TLS failure, got: " <> show x @@ -114,11 +117,7 @@ testValidatesCertificateWrongHostname = testCase "when the server's certificate does not have the server key usage flag" $ bracket (startMockServer certWithoutServerKeyUsage) (Async.cancel . fst) $ \(_, port) -> do tlsSettings <- mkTLSSettingsOrThrow settings - eitherClient <- - Polysemy.runM - . Polysemy.runError - . Polysemy.runInputConst tlsSettings - $ mkGrpcClient (SrvTarget "localhost." (fromIntegral port)) + eitherClient <- mkTestGrpcClient tlsSettings port case eitherClient of Left (RemoteErrorTLSException _ _) -> pure () Left x -> assertFailure $ "Expected TLS failure, got: " <> show x diff --git a/services/galley/src/Galley/API/Internal.hs b/services/galley/src/Galley/API/Internal.hs index efec177340..a3636b6d7b 100644 --- a/services/galley/src/Galley/API/Internal.hs +++ b/services/galley/src/Galley/API/Internal.hs @@ -27,7 +27,6 @@ where import Control.Exception.Safe (catchAny) import Control.Lens hiding ((.=)) -import Control.Monad.Except (runExceptT) import Data.Data (Proxy (Proxy)) import Data.Id as Id import Data.List1 (maybeList1) @@ -57,6 +56,7 @@ import qualified Galley.Data.Conversation as Data import Galley.Effects import Galley.Effects.ClientStore import Galley.Effects.ConversationStore +import Galley.Effects.FederatorAccess import Galley.Effects.GundeckAccess import Galley.Effects.MemberStore import Galley.Effects.Paging @@ -90,7 +90,6 @@ import Wire.API.Conversation (ConvIdsPage, pattern GetPaginatedConversationIds) import Wire.API.ErrorDescription (MissingLegalholdConsent) import Wire.API.Federation.API.Galley (UserDeletedConversationsNotification (UserDeletedConversationsNotification)) import qualified Wire.API.Federation.API.Galley as FedGalley -import Wire.API.Federation.Client (executeFederated) import Wire.API.Routes.MultiTablePaging (mtpHasMore, mtpPagingState, mtpResults) import Wire.API.Routes.MultiVerb (MultiVerb, RespondEmpty) import Wire.API.Routes.Public (ZOptConn, ZUser) @@ -564,7 +563,7 @@ rmUser user conn = do for_ (bucketRemote (fromRange cids)) $ \remoteConvs -> do let userDelete = UserDeletedConversationsNotification (tUnqualified lusr) (unsafeRange (tUnqualified remoteConvs)) let rpc = FedGalley.onUserDeleted FedGalley.clientRoutes (tDomain lusr) userDelete - res <- runExceptT (executeFederated (tDomain remoteConvs) rpc) + res <- liftSem $ runFederatedEither remoteConvs rpc case res of -- FUTUREWORK: Add a retry mechanism if there are federation errrors. -- See https://wearezeta.atlassian.net/browse/SQCORE-1091 diff --git a/services/galley/src/Galley/Intra/Federator/Types.hs b/services/galley/src/Galley/Intra/Federator/Types.hs index 44f43d5321..40c0b89268 100644 --- a/services/galley/src/Galley/Intra/Federator/Types.hs +++ b/services/galley/src/Galley/Intra/Federator/Types.hs @@ -25,6 +25,7 @@ module Galley.Intra.Federator.Types where import Control.Lens +import Control.Monad.Catch import Control.Monad.Except import Galley.Env import Galley.Options @@ -43,7 +44,10 @@ newtype FederationM a = FederationM Monad, MonadIO, MonadReader Env, - MonadUnliftIO + MonadUnliftIO, + MonadThrow, + MonadCatch, + MonadMask ) runFederationM :: Env -> FederationM a -> IO a