diff --git a/cabal.project b/cabal.project index 35f93fb100..94b8595b0a 100644 --- a/cabal.project +++ b/cabal.project @@ -135,7 +135,7 @@ source-repository-package source-repository-package type: git location: https://github.com/wireapp/http2 - tag: 1ee1ce432d923839dab6782410e91dc17df2a880 + tag: aa3501ad58e1abbd196781fac25a84f41ec2a787 source-repository-package type: git diff --git a/cabal.project.freeze b/cabal.project.freeze index 22013305b8..6e81f6e402 100644 --- a/cabal.project.freeze +++ b/cabal.project.freeze @@ -1691,7 +1691,6 @@ constraints: any.AC-Angle ==1.0, any.polyparse ==1.13, any.polysemy ==1.7.0.0, any.polysemy-check ==0.8.1.0, - any.polysemy-mocks ==0.2.0.0, any.polysemy-plugin ==0.4.2.0, any.pooled-io ==0.0.2.2, any.port-utils ==0.2.1.0, diff --git a/changelog.d/6-federation/federator-streaming b/changelog.d/6-federation/federator-streaming new file mode 100644 index 0000000000..7901572b90 --- /dev/null +++ b/changelog.d/6-federation/federator-streaming @@ -0,0 +1 @@ +Make federator capable of streaming responses diff --git a/libs/wire-api-federation/package.yaml b/libs/wire-api-federation/package.yaml index 6b4ff64da1..8fb6f5d300 100644 --- a/libs/wire-api-federation/package.yaml +++ b/libs/wire-api-federation/package.yaml @@ -25,6 +25,7 @@ dependencies: - http-types - http2 - imports +- kan-extensions - lifted-base - metrics-wai - mtl diff --git a/libs/wire-api-federation/src/Wire/API/Federation/Client.hs b/libs/wire-api-federation/src/Wire/API/Federation/Client.hs index 4215def028..4dae263caf 100644 --- a/libs/wire-api-federation/src/Wire/API/Federation/Client.hs +++ b/libs/wire-api-federation/src/Wire/API/Federation/Client.hs @@ -22,13 +22,17 @@ module Wire.API.Federation.Client ( FederatorClientEnv (..), FederatorClient, runFederatorClient, + runFederatorClientToCodensity, performHTTP2Request, + withHTTP2Request, + streamingResponseStrictBody, headersFromTable, ) where import qualified Control.Exception as E import Control.Monad.Catch +import Control.Monad.Codensity import Control.Monad.Except import qualified Data.Aeson as Aeson import qualified Data.ByteString as BS @@ -52,6 +56,7 @@ import Network.TLS as TLS import qualified Network.Wai.Utilities.Error as Wai import Servant.Client import Servant.Client.Core +import Servant.Types.SourceT import qualified System.TimeManager import Util.Options (Endpoint (..)) import Wire.API.Federation.Component @@ -65,7 +70,7 @@ data FederatorClientEnv = FederatorClientEnv } newtype FederatorClient (c :: Component) a = FederatorClient - {unFederatorClient :: ReaderT FederatorClientEnv (ExceptT FederatorClientError IO) a} + {unFederatorClient :: ReaderT FederatorClientEnv (ExceptT FederatorClientError (Codensity IO)) a} deriving newtype ( Functor, Applicative, @@ -75,6 +80,9 @@ newtype FederatorClient (c :: Component) a = FederatorClient MonadIO ) +liftCodensity :: Codensity IO a -> FederatorClient c a +liftCodensity = FederatorClient . lift . lift + headersFromTable :: HTTP2.HeaderTable -> [HTTP.Header] headersFromTable (headerList, _) = flip map headerList $ \(token, headerValue) -> (HTTP2.tokenKey token, headerValue) @@ -90,93 +98,136 @@ performHTTP2Request :: HTTP2.Request -> ByteString -> Int -> - IO (Either FederatorClientHTTP2Error (HTTP.Status, [HTTP.Header], Builder)) -performHTTP2Request mtlsConfig req hostname port = do - let drainResponse resp = go mempty - where - go acc = do - chunk <- HTTP2.getResponseBodyChunk resp - if BS.null chunk - then pure acc - else go (acc <> byteString chunk) + IO (Either FederatorClientHTTP2Error (ResponseF Builder)) +performHTTP2Request mtlsConfig req hostname port = try $ do + withHTTP2Request mtlsConfig req hostname port $ \resp -> do + b <- + fmap (either (const mempty) id) + . runExceptT + . runSourceT + . responseBody + $ resp + pure $ resp $> foldMap byteString b + +withHTTP2Request :: + Maybe TLS.ClientParams -> + HTTP2.Request -> + ByteString -> + Int -> + (StreamingResponse -> IO a) -> + IO a +withHTTP2Request mtlsConfig req hostname port k = do let clientConfig = HTTP2.ClientConfig "https" hostname {- cacheLimit: -} 20 - flip - E.catches - [ -- catch FederatorClientHTTP2Error (e.g. connection and TLS errors) - E.Handler (pure . Left), - -- catch HTTP2 exceptions - E.Handler (pure . Left . FederatorClientHTTP2Exception) - ] - $ bracket (connectSocket hostname port) NS.close $ \sock -> do - let withHTTP2Config k = case mtlsConfig of - Nothing -> bracket (HTTP2.allocSimpleConfig sock 4096) HTTP2.freeSimpleConfig k + E.handle (E.throw . FederatorClientHTTP2Exception) $ + bracket (connectSocket hostname port) NS.close $ \sock -> do + let withHTTP2Config k' = case mtlsConfig of + Nothing -> bracket (HTTP2.allocSimpleConfig sock 4096) HTTP2.freeSimpleConfig k' -- FUTUREWORK(federation): Use openssl Just tlsConfig -> do ctx <- E.handle (E.throw . FederatorClientTLSException) $ do ctx <- TLS.contextNew sock tlsConfig TLS.handshake ctx pure ctx - bracket (allocTLSConfig ctx 4096) freeTLSConfig k - withHTTP2Config $ \conf -> - HTTP2.run clientConfig conf $ \sendRequest -> do + bracket (allocTLSConfig ctx 4096) freeTLSConfig k' + withHTTP2Config $ \conf -> do + HTTP2.run clientConfig conf $ \sendRequest -> sendRequest req $ \resp -> do - result <- drainResponse resp let headers = headersFromTable (HTTP2.responseHeaders resp) - pure $ case HTTP2.responseStatus resp of - Nothing -> Left FederatorClientNoStatusCode - Just status -> Right (status, headers, result) + result = fromAction BS.null (HTTP2.getResponseBodyChunk resp) + case HTTP2.responseStatus resp of + Nothing -> E.throw FederatorClientNoStatusCode + Just status -> + k + Response + { responseStatusCode = status, + responseHeaders = Seq.fromList headers, + responseHttpVersion = HTTP.http20, + responseBody = result + } instance KnownComponent c => RunClient (FederatorClient c) where runRequestAcceptStatus expectedStatuses req = do - env <- ask - let baseUrlPath = - HTTP.encodePathSegments - [ "rpc", - domainText (ceTargetDomain env), - componentName (componentVal @c) - ] - let path = baseUrlPath <> requestPath req - body <- case requestBody req of - Just (RequestBodyLBS lbs, _) -> pure lbs - Just (RequestBodyBS bs, _) -> pure (LBS.fromStrict bs) - Just (RequestBodySource _, _) -> - throwError FederatorClientStreamingNotSupported - Nothing -> pure mempty - let req' = - HTTP2.requestBuilder - (requestMethod req) - (LBS.toStrict (toLazyByteString path)) - (toList (requestHeaders req) <> [(originDomainHeaderName, toByteString' (ceOriginDomain env))]) - (lazyByteString body) - let Endpoint (Text.encodeUtf8 -> hostname) (fromIntegral -> port) = ceFederator env - eresp <- liftIO $ performHTTP2Request Nothing req' hostname port - case eresp of - Left err -> throwError (FederatorClientHTTP2Error err) - Right (status, headers, result) - | maybe (HTTP.statusIsSuccessful status) (elem status) expectedStatuses -> - pure $ - Response - { responseStatusCode = status, - responseHeaders = Seq.fromList headers, - responseHttpVersion = HTTP.http20, - responseBody = toLazyByteString result - } - | otherwise -> - throwError $ - FederatorClientError - ( mkFailureResponse - status - (ceTargetDomain env) - (toLazyByteString (requestPath req)) - (toLazyByteString result) - ) + let successfulStatus status = + maybe + (HTTP.statusIsSuccessful status) + (elem status) + expectedStatuses + withHTTP2StreamingRequest successfulStatus req $ \resp -> do + bdy <- + fmap (either (const mempty) (toLazyByteString . foldMap byteString)) + . runExceptT + . runSourceT + . responseBody + $ resp + pure $ resp $> bdy throwClientError = throwError . FederatorClientServantError +instance KnownComponent c => RunStreamingClient (FederatorClient c) where + withStreamingRequest = withHTTP2StreamingRequest HTTP.statusIsSuccessful + +streamingResponseStrictBody :: StreamingResponse -> IO Builder +streamingResponseStrictBody resp = + fmap (either stringUtf8 (foldMap byteString)) + . runExceptT + . runSourceT + . responseBody + $ resp + +withHTTP2StreamingRequest :: + forall c a. + KnownComponent c => + (HTTP.Status -> Bool) -> + Request -> + (StreamingResponse -> IO a) -> + FederatorClient c a +withHTTP2StreamingRequest successfulStatus req handleResponse = do + env <- ask + let baseUrlPath = + HTTP.encodePathSegments + [ "rpc", + domainText (ceTargetDomain env), + componentName (componentVal @c) + ] + let path = baseUrlPath <> requestPath req + body <- case requestBody req of + Just (RequestBodyLBS lbs, _) -> pure lbs + Just (RequestBodyBS bs, _) -> pure (LBS.fromStrict bs) + Just (RequestBodySource _, _) -> + throwError FederatorClientStreamingNotSupported + Nothing -> pure mempty + let req' = + HTTP2.requestBuilder + (requestMethod req) + (LBS.toStrict (toLazyByteString path)) + (toList (requestHeaders req) <> [(originDomainHeaderName, toByteString' (ceOriginDomain env))]) + (lazyByteString body) + let Endpoint (Text.encodeUtf8 -> hostname) (fromIntegral -> port) = ceFederator env + resp <- + (either throwError pure =<<) . liftCodensity $ + Codensity $ \k -> + E.catch + (withHTTP2Request Nothing req' hostname port (k . Right)) + (k . Left . FederatorClientHTTP2Error) + + if successfulStatus (responseStatusCode resp) + then liftIO $ handleResponse resp + else do + -- in case of an error status code, read the whole body to construct the error + bdy <- liftIO $ streamingResponseStrictBody resp + throwError $ + FederatorClientError + ( mkFailureResponse + (responseStatusCode resp) + (ceTargetDomain env) + (toLazyByteString (requestPath req)) + (toLazyByteString bdy) + ) + mkFailureResponse :: HTTP.Status -> Domain -> LByteString -> LByteString -> Wai.Error mkFailureResponse status domain path body -- If the outward federator fails with 403, that means that there was an @@ -211,12 +262,25 @@ mkFailureResponse status domain path body "unknown-federation-error" (LText.decodeUtf8With Text.lenientDecode body) +-- | Run federator client synchronously. runFederatorClient :: KnownComponent c => FederatorClientEnv -> FederatorClient c a -> IO (Either FederatorClientError a) -runFederatorClient env action = runExceptT (runReaderT (unFederatorClient action) env) +runFederatorClient env = + lowerCodensity + . runFederatorClientToCodensity env + +runFederatorClientToCodensity :: + KnownComponent c => + FederatorClientEnv -> + FederatorClient c a -> + Codensity IO (Either FederatorClientError a) +runFederatorClientToCodensity env = + runExceptT + . flip runReaderT env + . unFederatorClient freeTLSConfig :: HTTP2.Config -> IO () freeTLSConfig cfg = free (HTTP2.confWriteBuffer cfg) diff --git a/libs/wire-api-federation/wire-api-federation.cabal b/libs/wire-api-federation/wire-api-federation.cabal index a1592e461e..5d665a791a 100644 --- a/libs/wire-api-federation/wire-api-federation.cabal +++ b/libs/wire-api-federation/wire-api-federation.cabal @@ -4,7 +4,7 @@ cabal-version: 1.12 -- -- see: https://github.com/sol/hpack -- --- hash: 621c254076cf520b525269ca4fc550df57f410aea52a288f6cb68bd2d6f1ada3 +-- hash: f62744549c34b54e9900b31c33a4ebad5c51ac13ec44010ad2da6c6ca67fdc29 name: wire-api-federation version: 0.1.0 @@ -51,6 +51,7 @@ library , http-types , http2 , imports + , kan-extensions , lifted-base , metrics-wai , mtl @@ -110,6 +111,7 @@ test-suite spec , http-types , http2 , imports + , kan-extensions , lifted-base , metrics-wai , mtl diff --git a/services/federator/federator.cabal b/services/federator/federator.cabal index fca3d7731c..7bfdce95e1 100644 --- a/services/federator/federator.cabal +++ b/services/federator/federator.cabal @@ -4,7 +4,7 @@ cabal-version: 1.12 -- -- see: https://github.com/sol/hpack -- --- hash: 2131bf1a367dd734cbccd900c4724c5b48e3f494501b2221e3f32ecaf0a12ec4 +-- hash: 371580e0c3adbdb4994c74a23ada11c91023bc654f2f0fe304ead7e88a3b8ba6 name: federator version: 1.0.0 @@ -83,6 +83,7 @@ library , http-types , http2 , imports + , kan-extensions , lens , metrics-core , metrics-wai @@ -94,7 +95,7 @@ library , polysemy-wire-zoo , retry , servant - , servant-server + , servant-client-core , streaming-commons , string-conversions , text @@ -149,6 +150,7 @@ executable federator , http-types , http2 , imports + , kan-extensions , lens , metrics-core , metrics-wai @@ -160,7 +162,7 @@ executable federator , polysemy-wire-zoo , retry , servant - , servant-server + , servant-client-core , streaming-commons , string-conversions , text @@ -224,6 +226,7 @@ executable federator-integration , http-types , http2 , imports + , kan-extensions , lens , metrics-core , metrics-wai @@ -237,7 +240,7 @@ executable federator-integration , random , retry , servant - , servant-server + , servant-client-core , streaming-commons , string-conversions , tasty @@ -308,6 +311,7 @@ test-suite federator-tests , http2 , imports , interpolate + , kan-extensions , lens , metrics-core , metrics-wai @@ -316,12 +320,11 @@ test-suite federator-tests , network-uri , pem , polysemy - , polysemy-mocks , polysemy-wire-zoo , retry , servant , servant-client - , servant-server + , servant-client-core , streaming-commons , string-conversions , tasty diff --git a/services/federator/package.yaml b/services/federator/package.yaml index bb4c0b9655..0c9abdd850 100644 --- a/services/federator/package.yaml +++ b/services/federator/package.yaml @@ -31,6 +31,7 @@ dependencies: - http-client-openssl - http-types - http2 +- kan-extensions - imports - lens - metrics-core @@ -43,7 +44,7 @@ dependencies: - polysemy-wire-zoo - retry - servant -- servant-server +- servant-client-core - streaming-commons - string-conversions - text @@ -111,7 +112,6 @@ tests: - directory - federator - interpolate - - polysemy-mocks - QuickCheck - servant-client - streaming-commons diff --git a/services/federator/src/Federator/App.hs b/services/federator/src/Federator/App.hs index e1601ed890..2a33cd9860 100644 --- a/services/federator/src/Federator/App.hs +++ b/services/federator/src/Federator/App.hs @@ -34,8 +34,6 @@ import Federator.Env (Env, applog, httpManager, requestId) import Imports import Polysemy import Polysemy.Input -import Servant.API.Generic () -import Servant.Server () import System.Logger.Class as LC import qualified System.Logger.Extended as Log diff --git a/services/federator/src/Federator/Env.hs b/services/federator/src/Federator/Env.hs index 4594b4fd85..ced29deb2b 100644 --- a/services/federator/src/Federator/Env.hs +++ b/services/federator/src/Federator/Env.hs @@ -21,7 +21,6 @@ module Federator.Env where import Bilge (RequestId) -import qualified Bilge as RPC import Control.Lens (makeLenses) import Data.Metrics (Metrics) import Data.X509.CertificateStore @@ -31,6 +30,7 @@ import Network.DNS.Resolver (Resolver) import qualified Network.HTTP.Client as HTTP import qualified Network.TLS as TLS import qualified System.Logger.Class as LC +import Util.Options import Wire.API.Federation.Component data TLSSettings = TLSSettings @@ -44,7 +44,7 @@ data Env = Env _requestId :: RequestId, _dnsResolver :: Resolver, _runSettings :: RunSettings, - _service :: Component -> RPC.Request, + _service :: Component -> Endpoint, _httpManager :: HTTP.Manager, _tls :: IORef TLSSettings } diff --git a/services/federator/src/Federator/ExternalServer.hs b/services/federator/src/Federator/ExternalServer.hs index 59546eb216..b81c7b1869 100644 --- a/services/federator/src/Federator/ExternalServer.hs +++ b/services/federator/src/Federator/ExternalServer.hs @@ -18,8 +18,9 @@ module Federator.ExternalServer (callInward, serveInward, parseRequestData, RequestData (..)) where import qualified Data.ByteString as BS -import Data.ByteString.Builder (toLazyByteString) +import Data.ByteString.Builder import qualified Data.ByteString.Lazy as LBS +import qualified Data.Sequence as Seq import qualified Data.Text as Text import Federator.Discovery import Federator.Env @@ -36,6 +37,7 @@ import Polysemy.Error import Polysemy.Input import Polysemy.TinyLog (TinyLog) import qualified Polysemy.TinyLog as Log +import Servant.Client.Core import qualified System.Logger.Message as Log import Wire.API.Federation.Component import Wire.API.Federation.Domain @@ -43,7 +45,7 @@ import Wire.API.Federation.Domain -- FUTUREWORK(federation): Versioning of the federation API. callInward :: Members - '[ Service, + '[ ServiceStreaming, Embed IO, TinyLog, DiscoverFederator, @@ -67,12 +69,18 @@ callInward wreq = do let path = LBS.toStrict (toLazyByteString (HTTP.encodePathSegments ["federation", rdRPC req])) - (status, body) <- serviceCall (rdComponent req) path (rdBody req) validatedDomain + resp <- serviceCall (rdComponent req) path (rdBody req) validatedDomain Log.debug $ Log.msg ("Inward Request response" :: ByteString) - . Log.field "status" (show status) - - pure $ Wai.responseLBS status defaultHeaders (fromMaybe mempty body) + . Log.field "status" (show (responseStatusCode resp)) + pure $ + streamingResponseToWai + resp + { responseHeaders = + Seq.filter + (\(name, _) -> name == "Content-Type") + (responseHeaders resp) + } data RequestData = RequestData { rdComponent :: Component, diff --git a/services/federator/src/Federator/InternalServer.hs b/services/federator/src/Federator/InternalServer.hs index a3fadf6f2e..5493448f32 100644 --- a/services/federator/src/Federator/InternalServer.hs +++ b/services/federator/src/Federator/InternalServer.hs @@ -65,6 +65,7 @@ import qualified Polysemy.Input as Polysemy import qualified Polysemy.Resource as Polysemy import Polysemy.TinyLog (TinyLog) import qualified Polysemy.TinyLog as Log +import Servant.Client.Core import qualified System.TimeManager as T import qualified System.X509 as TLS import Wire.API.Federation.Component @@ -118,14 +119,14 @@ callOutward req = do rd <- parseRequestData req domain <- parseDomainText (rdTargetDomain rd) ensureCanFederateWith domain - (status, result) <- + resp <- discoverAndCall domain (rdComponent rd) (rdRPC rd) (rdHeaders rd) (fromLazyByteString (rdBody rd)) - pure $ Wai.responseBuilder status defaultHeaders result + pure $ streamingResponseToWai resp serveOutward :: Env -> Int -> IO () serveOutward = serve callOutward diff --git a/services/federator/src/Federator/Remote.hs b/services/federator/src/Federator/Remote.hs index bbf5146da5..a206fcea36 100644 --- a/services/federator/src/Federator/Remote.hs +++ b/services/federator/src/Federator/Remote.hs @@ -26,7 +26,9 @@ module Federator.Remote ) where +import qualified Control.Exception as E import Control.Lens ((^.)) +import Control.Monad.Codensity import Data.Binary.Builder import Data.ByteString.Conversion (toByteString') import qualified Data.ByteString.Lazy as LBS @@ -49,6 +51,7 @@ import qualified Network.TLS.Extra.Cipher as TLS import Polysemy import Polysemy.Error import Polysemy.Input +import Servant.Client.Core import Wire.API.Federation.Client import Wire.API.Federation.Component import Wire.API.Federation.Error @@ -93,13 +96,13 @@ data Remote m a where Text -> [HTTP.Header] -> Builder -> - Remote m (HTTP.Status, Builder) + Remote m StreamingResponse makeSem ''Remote interpretRemote :: Members - '[ Embed IO, + '[ Embed (Codensity IO), DiscoverFederator, Error DiscoveryFailure, Error RemoteError, @@ -117,12 +120,21 @@ interpretRemote = interpret $ \case HTTP.encodePathSegments ["federation", componentName component, rpc] req' = HTTP2.requestBuilder HTTP.methodPost path headers body tlsConfig = mkTLSConfig settings hostname port - (status, _, result) <- - mapError (RemoteError target) . (fromEither =<<) . embed $ - performHTTP2Request (Just tlsConfig) req' hostname (fromIntegral port) - unless (HTTP.statusIsSuccessful status) $ - throw $ RemoteErrorResponse target status (toLazyByteString result) - pure (status, result) + + resp <- mapError (RemoteError target) . (fromEither @FederatorClientHTTP2Error =<<) . embed $ + Codensity $ \k -> + E.catch + (withHTTP2Request (Just tlsConfig) req' hostname (fromIntegral port) (k . Right)) + (k . Left) + + unless (HTTP.statusIsSuccessful (responseStatusCode resp)) $ do + bdy <- embed @(Codensity IO) . liftIO $ streamingResponseStrictBody resp + throw $ + RemoteErrorResponse + target + (responseStatusCode resp) + (toLazyByteString bdy) + pure resp mkTLSConfig :: TLSSettings -> ByteString -> Word16 -> TLS.ClientParams mkTLSConfig settings hostname port = diff --git a/services/federator/src/Federator/Response.hs b/services/federator/src/Federator/Response.hs index 7edab1111b..76e2649a85 100644 --- a/services/federator/src/Federator/Response.hs +++ b/services/federator/src/Federator/Response.hs @@ -20,10 +20,13 @@ module Federator.Response serve, runWaiError, runWaiErrors, + streamingResponseToWai, ) where import Control.Lens +import Control.Monad.Codensity +import Data.ByteString.Builder import Federator.Discovery import Federator.Env import Federator.Error @@ -39,10 +42,13 @@ import qualified Network.Wai.Handler.Warp as Warp import qualified Network.Wai.Utilities.Error as Wai import qualified Network.Wai.Utilities.Server as Wai import Polysemy +import Polysemy.Embed import Polysemy.Error import Polysemy.Input import Polysemy.Internal import Polysemy.TinyLog +import Servant.Client.Core +import Servant.Types.SourceT import Wire.Network.DNS.Effect defaultHeaders :: [HTTP.Header] @@ -96,14 +102,13 @@ serve action env port = where app :: Wai.Application app req respond = - runFederator env (action req) - >>= respond + runCodensity (runFederator env (action req)) respond type AllEffects = '[ Remote, DiscoverFederator, DNSLookup, -- needed by DiscoverFederator - Service, + ServiceStreaming, Input RunSettings, Input TLSSettings, -- needed by Remote Input Env, -- needed by Service @@ -112,14 +117,16 @@ type AllEffects = Error ServerError, Error DiscoveryFailure, TinyLog, - Embed IO + Embed IO, + Embed (Codensity IO) ] -- | Run Sem action containing HTTP handlers. All errors have to been handled -- already by this point. -runFederator :: Env -> Sem AllEffects Wai.Response -> IO Wai.Response +runFederator :: Env -> Sem AllEffects Wai.Response -> Codensity IO Wai.Response runFederator env = - runM @IO + runM + . runEmbedded @IO @(Codensity IO) liftIO . runTinyLog (view applog env) -- FUTUREWORK: add request id . runWaiErrors @'[ ValidationError, @@ -130,7 +137,18 @@ runFederator env = . runInputConst env . runInputSem (embed @IO (readIORef (view tls env))) . runInputConst (view runSettings env) - . interpretService + . interpretServiceHTTP . runDNSLookupWithResolver (view dnsResolver env) . runFederatorDiscovery . interpretRemote + +streamingResponseToWai :: StreamingResponse -> Wai.Response +streamingResponseToWai resp = + let headers = toList (responseHeaders resp) + status = responseStatusCode resp + streamingBody output flush = + foreach + (const (pure ())) + (\chunk -> output (byteString chunk) *> flush) + (responseBody resp) + in Wai.responseStream status headers streamingBody diff --git a/services/federator/src/Federator/Run.hs b/services/federator/src/Federator/Run.hs index 3e9312a460..a41b259451 100644 --- a/services/federator/src/Federator/Run.hs +++ b/services/federator/src/Federator/Run.hs @@ -34,13 +34,11 @@ module Federator.Run ) where -import qualified Bilge as RPC import Control.Concurrent.Async import Control.Exception (bracket) import Control.Lens ((^.)) import Data.Default (def) import qualified Data.Metrics.Middleware as Metrics -import Data.Text.Encoding (encodeUtf8) import Federator.Env import Federator.ExternalServer (serveInward) import Federator.InternalServer (serveOutward) @@ -95,14 +93,12 @@ newEnv o _dnsResolver = do _applog <- LogExt.mkLogger (Opt.logLevel o) (Opt.logNetStrings o) (Opt.logFormat o) let _requestId = def let _runSettings = Opt.optSettings o - let _service Brig = mkEndpoint (Opt.brig o) - _service Galley = mkEndpoint (Opt.galley o) - _service Cargohold = mkEndpoint (Opt.cargohold o) + let _service Brig = Opt.brig o + _service Galley = Opt.galley o + _service Cargohold = Opt.cargohold o _httpManager <- initHttpManager _tls <- mkTLSSettingsOrThrow _runSettings >>= newIORef return Env {..} - where - mkEndpoint s = RPC.host (encodeUtf8 (s ^. epHost)) . RPC.port (s ^. epPort) $ RPC.empty closeEnv :: Env -> IO () closeEnv e = do diff --git a/services/federator/src/Federator/Service.hs b/services/federator/src/Federator/Service.hs index 99b024fcc3..9514f9ecf2 100644 --- a/services/federator/src/Federator/Service.hs +++ b/services/federator/src/Federator/Service.hs @@ -15,35 +15,57 @@ -- You should have received a copy of the GNU Affero General Public License along -- with this program. If not, see . -module Federator.Service where +module Federator.Service + ( Service (..), + ServiceStreaming, + interpretServiceHTTP, + serviceCall, + ) +where -- FUTUREWORK(federation): Once we authenticate the call, we should send authentication data -- to brig so brig can do some authorization as required. import qualified Bilge as RPC -import Bilge.RPC (rpc') +import Control.Exception import Control.Lens (view) +import Control.Monad.Codensity +import qualified Data.ByteString as BS import Data.Domain +import qualified Data.Sequence as Seq import Data.String.Conversions (cs) -import qualified Data.Text.Lazy as LText -import Federator.App +import qualified Data.Text.Encoding as Text import Federator.Env import Imports +import Network.HTTP.Client import qualified Network.HTTP.Types as HTTP import Polysemy import Polysemy.Input +import Polysemy.TinyLog +import qualified Servant.Client.Core as Servant +import Servant.Types.SourceT +import Util.Options import Wire.API.Federation.Component import Wire.API.Federation.Domain (originDomainHeaderName) -newtype ServiceError = ServiceErrorInvalidStatus HTTP.Status - deriving (Eq, Show) +type ServiceStreaming = Service (SourceT IO ByteString) -data Service m a where - -- | Returns status and body, 'HTTP.Response' is not nice to work with in tests - ServiceCall :: Component -> ByteString -> LByteString -> Domain -> Service m (HTTP.Status, Maybe LByteString) +data Service body m a where + -- | Returns status, headers and body, 'HTTP.Response' is not nice to work with in tests + ServiceCall :: Component -> ByteString -> LByteString -> Domain -> Service body m (Servant.ResponseF body) makeSem ''Service +bodyReaderToStreamT :: Monad m => m ByteString -> SourceT m ByteString +bodyReaderToStreamT action = fromStepT go + where + go = Effect $ do + chunk <- action + pure $ + if BS.null chunk + then Stop + else Yield chunk go + -- FUTUREWORK(federation): Do we want to use servant client here? May make -- everything typed and safe -- @@ -52,19 +74,36 @@ makeSem ''Service -- -- FUTUREWORK: unify this interpretation with similar ones in Galley -- --- FUTUREWORK: does it make sense to use a lower level abstraction instead of bilge here? -interpretService :: - Members '[Embed IO, Input Env] r => - Sem (Service ': r) a -> +interpretServiceHTTP :: + Members '[Embed (Codensity IO), Input Env, TinyLog] r => + Sem (ServiceStreaming ': r) a -> Sem r a -interpretService = interpret $ \case - ServiceCall component path body domain -> embedApp @IO $ do - serviceReq <- view service <$> ask - res <- - rpc' (LText.pack (show component)) (serviceReq component) $ - RPC.method HTTP.POST - . RPC.path path - . RPC.body (RPC.RequestBodyLBS body) - . RPC.contentJson - . RPC.header originDomainHeaderName (cs (domainText domain)) - pure (RPC.responseStatus res, RPC.responseBody res) +interpretServiceHTTP = interpret $ \case + ServiceCall component rpcPath body domain -> do + Endpoint serviceHost servicePort <- inputs (view service) <*> pure component + manager <- inputs (view httpManager) + reqId <- inputs (view requestId) + let req = + defaultRequest + { method = HTTP.methodPost, + host = Text.encodeUtf8 serviceHost, + port = fromIntegral servicePort, + requestBody = RequestBodyLBS body, + path = rpcPath, + requestHeaders = + [ ("Content-Type", "application/json"), + (originDomainHeaderName, cs (domainText domain)), + (RPC.requestIdName, RPC.unRequestId reqId) + ] + } + + embed $ + Codensity $ \k -> + bracket (responseOpen req manager) responseClose $ \resp -> + k $ + Servant.Response + { Servant.responseStatusCode = responseStatus resp, + Servant.responseHeaders = Seq.fromList (responseHeaders resp), + Servant.responseHttpVersion = HTTP.http11, + Servant.responseBody = bodyReaderToStreamT (responseBody resp) + } diff --git a/services/federator/test/integration/Test/Federator/IngressSpec.hs b/services/federator/test/integration/Test/Federator/IngressSpec.hs index 04d80bb147..08d989b647 100644 --- a/services/federator/test/integration/Test/Federator/IngressSpec.hs +++ b/services/federator/test/integration/Test/Federator/IngressSpec.hs @@ -18,6 +18,7 @@ module Test.Federator.IngressSpec where import Control.Lens (view) +import Control.Monad.Codensity import qualified Data.Aeson as Aeson import Data.Binary.Builder import Data.Domain @@ -32,11 +33,14 @@ import Federator.Remote import Imports import qualified Network.HTTP.Types as HTTP import Polysemy +import Polysemy.Embed import Polysemy.Error import Polysemy.Input +import Servant.Client.Core import Test.Federator.Util import Test.Hspec import Util.Options (Endpoint (Endpoint)) +import Wire.API.Federation.Client import Wire.API.Federation.Component import Wire.API.Federation.Domain import Wire.API.User @@ -53,14 +57,15 @@ spec env = do _ <- putHandle brig (userId user) hdl let expectedProfile = (publicProfile user UserLegalHoldNoConsent) {profileHandle = Just (Handle hdl)} - (status, resp) <- + resp <- runTestSem . assertNoError @RemoteError $ inwardBrigCallViaIngress "get-user-by-handle" $ (Aeson.fromEncoding (Aeson.toEncoding hdl)) - let actualProfile = Aeson.decode (toLazyByteString resp) liftIO $ do - status `shouldBe` HTTP.status200 + bdy <- streamingResponseStrictBody resp + let actualProfile = Aeson.decode (toLazyByteString bdy) + responseStatusCode resp `shouldBe` HTTP.status200 actualProfile `shouldBe` (Just expectedProfile) it "should not be accessible without a client certificate" $ @@ -102,7 +107,7 @@ inwardBrigCallViaIngress :: Members [Input TestEnv, Embed IO, Error RemoteError] r => Text -> Builder -> - Sem r (HTTP.Status, Builder) + Sem r StreamingResponse inwardBrigCallViaIngress path payload = do tlsSettings <- inputs (view teTLSSettings) inwardBrigCallViaIngressWithSettings tlsSettings path payload @@ -112,7 +117,7 @@ inwardBrigCallViaIngressWithSettings :: TLSSettings -> Text -> Builder -> - Sem r (HTTP.Status, Builder) + Sem r StreamingResponse inwardBrigCallViaIngressWithSettings tlsSettings requestPath payload = do Endpoint ingressHost ingressPort <- cfgNginxIngress . view teTstOpts <$> input @@ -122,5 +127,6 @@ inwardBrigCallViaIngressWithSettings tlsSettings requestPath payload = runInputConst tlsSettings . assertNoError @DiscoveryFailure . discoverConst target + . runEmbedded @(Codensity IO) @IO lowerCodensity . interpretRemote $ discoverAndCall (Domain "example.com") Brig requestPath headers payload diff --git a/services/federator/test/unit/Test/Federator/Client.hs b/services/federator/test/unit/Test/Federator/Client.hs index a0d172050f..854ac57087 100644 --- a/services/federator/test/unit/Test/Federator/Client.hs +++ b/services/federator/test/unit/Test/Federator/Client.hs @@ -18,14 +18,26 @@ module Test.Federator.Client (tests) where import Control.Exception hiding (handle) +import Control.Monad.Codensity +import Control.Monad.Except import qualified Data.Aeson as Aeson import Data.Bifunctor (first) +import qualified Data.ByteString as BS +import Data.ByteString.Builder (Builder, byteString, toLazyByteString) +import qualified Data.ByteString.Lazy as LBS import Data.Domain +import Data.Proxy +import qualified Data.Text.Encoding as Text import Federator.MockServer import Imports import Network.HTTP.Types as HTTP import qualified Network.HTTP2.Client as HTTP2 -import Network.Wai.Utilities.Error as Wai +import qualified Network.Wai as Wai +import qualified Network.Wai.Utilities.Error as Wai +import Servant.API +import Servant.Client +import Servant.Client.Core +import Servant.Types.SourceT import Test.QuickCheck (arbitrary, generate) import Test.Tasty import Test.Tasty.HUnit @@ -53,6 +65,7 @@ tests = [ testGroup "Servant" [ testCase "testClientSuccess" testClientSuccess, + testCase "testClientStreaming" testClientStreaming, testCase "testClientFailure" testClientFailure, testCase "testFederatorFailure" testFederatorFailure, testCase "testClientException" testClientExceptions, @@ -60,7 +73,8 @@ tests = ], testGroup "HTTP2 client" - [ testCase "testResponseHeaders" testResponseHeaders + [ testCase "testResponseHeaders" testResponseHeaders, + testCase "testStreaming" testStreaming ] ] @@ -106,6 +120,25 @@ testClientSuccess = do ] first (const ()) actualResponse @?= Right (Just expectedResponse) +type StreamingAPI = StreamGet NewlineFraming PlainText (SourceIO Text) + +testClientStreaming :: IO () +testClientStreaming = withInfiniteMockServer $ \port -> do + let env = + FederatorClientEnv + { ceOriginDomain = originDomain, + ceTargetDomain = targetDomain, + ceFederator = Endpoint "127.0.0.1" (fromIntegral port) + } + let c = clientIn (Proxy @StreamingAPI) (Proxy @(FederatorClient 'Brig)) + runCodensity (runFederatorClientToCodensity env c) $ \eout -> + case eout of + Left err -> assertFailure $ "Unexpected error: " <> displayException err + Right out -> do + let expected = mconcat (replicate 500 "Hello") + actual <- takeSourceT (fromIntegral (LBS.length expected)) (fmap Text.encodeUtf8 out) + actual @?= expected + testClientFailure :: IO () testClientFailure = do handle <- generate arbitrary @@ -181,6 +214,40 @@ testResponseHeaders = do Left err -> assertFailure $ "Unexpected error while connecting to mock federator: " <> show err - Right (status, headers, _) -> do - status @?= HTTP.status200 - lookup "X-Foo" headers @?= Just "bar" + Right resp -> do + responseStatusCode resp @?= HTTP.status200 + lookup "X-Foo" (toList (responseHeaders resp)) @?= Just "bar" + +testStreaming :: IO () +testStreaming = withInfiniteMockServer $ \port -> do + let req = HTTP2.requestBuilder HTTP.methodPost "test" [] mempty + withHTTP2Request Nothing req "127.0.0.1" port $ \resp -> do + let expected = mconcat (replicate 512 "Hello\n") + actual <- takeSourceT (fromIntegral (LBS.length expected)) (responseBody resp) + actual @?= expected + +withInfiniteMockServer :: (Int -> IO a) -> IO a +withInfiniteMockServer k = bracket (startMockServer Nothing app) fst (k . snd) + where + app _ respond = respond $ + Wai.responseStream HTTP.ok200 mempty $ \write flush -> + let go n = do + when (n == 0) $ flush + write (byteString "Hello\n") *> go (if n == 0 then 100 else n - 1) + in go (1000 :: Int) + +-- SourceT utilities + +takeStepT :: Builder -> Int -> StepT IO ByteString -> IO LByteString +takeStepT acc _ Stop = pure (toLazyByteString acc) +takeStepT acc _ (Error _) = pure (toLazyByteString acc) +takeStepT acc s (Skip next) = takeStepT acc s next +takeStepT acc s (Yield chunk next) + | BS.length chunk >= s = + pure $ toLazyByteString (acc <> byteString (BS.take s chunk)) + | otherwise = do + takeStepT (acc <> byteString chunk) (s - BS.length chunk) next +takeStepT acc s (Effect m) = m >>= takeStepT acc s + +takeSourceT :: Int -> SourceT IO ByteString -> IO LByteString +takeSourceT s m = unSourceT m (takeStepT mempty s) diff --git a/services/federator/test/unit/Test/Federator/ExternalServer.hs b/services/federator/test/unit/Test/Federator/ExternalServer.hs index a1c749cec4..a70934f8da 100644 --- a/services/federator/test/unit/Test/Federator/ExternalServer.hs +++ b/services/federator/test/unit/Test/Federator/ExternalServer.hs @@ -26,7 +26,7 @@ import qualified Data.Text.Encoding as Text import Federator.Discovery import Federator.Error.ServerError (ServerError (..)) import Federator.ExternalServer -import Federator.Service (Service) +import Federator.Service (Service (..), ServiceStreaming) import Federator.Validation import Imports import qualified Network.HTTP.Types as HTTP @@ -35,18 +35,17 @@ import qualified Network.Wai.Utilities.Server as Wai import Polysemy import Polysemy.Error import Polysemy.Input -import qualified Polysemy.TinyLog as TinyLog +import Polysemy.Output +import Polysemy.TinyLog +import qualified Servant.Client.Core as Servant +import Servant.Types.SourceT import Test.Federator.Options (noClientCertSettings) import Test.Federator.Util import Test.Federator.Validation (mockDiscoveryTrivial) -import Test.Polysemy.Mock (Mock (mock), evalMock) -import Test.Polysemy.Mock.TH (genMock) import Test.Tasty import Test.Tasty.HUnit import Wire.API.Federation.Component -genMock ''Service - tests :: TestTree tests = testGroup @@ -72,59 +71,79 @@ exampleRequest certFile path = do trBody = "\"foo\"" } +data Call = Call + { cComponent :: Component, + cPath :: ByteString, + cBody :: LByteString, + cDomain :: Domain + } + deriving (Eq, Show) + +mockService :: + Members [Output Call, Embed IO] r => + HTTP.Status -> + Sem (ServiceStreaming ': r) a -> + Sem r a +mockService status = interpret $ \case + ServiceCall comp path body domain -> do + output (Call comp path body domain) + pure + Servant.Response + { Servant.responseStatusCode = status, + Servant.responseHeaders = mempty, + Servant.responseHttpVersion = HTTP.http11, + Servant.responseBody = source ["\"bar\""] + } + requestBrigSuccess :: TestTree requestBrigSuccess = - testCase "should translate response from brig to 'InwardResponseBody' when response has status 200" $ do + testCase "should forward response from brig when status is 200" $ do request <- exampleRequest "test/resources/unit/localhost.example.com.pem" "/federation/brig/get-user-by-handle" - runM . evalMock @Service @IO $ do - mockServiceCallReturns @IO (\_ _ _ _ -> pure (HTTP.ok200, Just "\"bar\"")) - - res <- - mock @Service @IO - . assertNoError @ValidationError - . assertNoError @DiscoveryFailure - . assertNoError @ServerError - . TinyLog.discardLogs - . mockDiscoveryTrivial - . runInputConst noClientCertSettings - $ callInward request - actualCalls <- mockServiceCallCalls @IO - let expectedCall = (Brig, "/federation/get-user-by-handle", "\"foo\"", aValidDomain) - embed $ assertEqual "one call to brig should be made" [expectedCall] actualCalls - embed $ Wai.responseStatus res @?= HTTP.status200 - body <- embed $ Wai.lazyResponseBody res - embed $ body @?= "\"bar\"" + (actualCalls, res) <- + runM + . runOutputList + . mockService HTTP.ok200 + . assertNoError @ValidationError + . assertNoError @DiscoveryFailure + . assertNoError @ServerError + . discardLogs + . mockDiscoveryTrivial + . runInputConst noClientCertSettings + $ callInward request + let expectedCall = Call Brig "/federation/get-user-by-handle" "\"foo\"" aValidDomain + assertEqual "one call to brig should be made" [expectedCall] actualCalls + Wai.responseStatus res @?= HTTP.status200 + body <- Wai.lazyResponseBody res + body @?= "\"bar\"" requestBrigFailure :: TestTree requestBrigFailure = - testCase "should translate response from brig to 'InwardResponseError' when response has status 404" $ do + testCase "should preserve the status code returned by the service" $ do request <- exampleRequest "test/resources/unit/localhost.example.com.pem" "/federation/brig/get-user-by-handle" - runM . evalMock @Service @IO $ do - let brigResponseBody = "response body" - mockServiceCallReturns @IO (\_ _ _ _ -> pure (HTTP.notFound404, Just brigResponseBody)) - res <- - mock @Service @IO - . assertNoError @ValidationError - . assertNoError @DiscoveryFailure - . assertNoError @ServerError - . TinyLog.discardLogs - . mockDiscoveryTrivial - . runInputConst noClientCertSettings - $ callInward request - - actualCalls <- mockServiceCallCalls @IO - let expectedCall = (Brig, "/federation/get-user-by-handle", "\"foo\"", aValidDomain) - embed $ assertEqual "one call to brig should be made" [expectedCall] actualCalls - embed $ Wai.responseStatus res @?= HTTP.notFound404 - body <- embed $ Wai.lazyResponseBody res - embed $ body @?= brigResponseBody + (actualCalls, res) <- + runM + . runOutputList + . mockService HTTP.notFound404 + . assertNoError @ValidationError + . assertNoError @DiscoveryFailure + . assertNoError @ServerError + . discardLogs + . mockDiscoveryTrivial + . runInputConst noClientCertSettings + $ callInward request + + let expectedCall = Call Brig "/federation/get-user-by-handle" "\"foo\"" aValidDomain + assertEqual "one call to brig should be made" [expectedCall] actualCalls + Wai.responseStatus res @?= HTTP.notFound404 + body <- Wai.lazyResponseBody res + body @?= "\"bar\"" requestGalleySuccess :: TestTree requestGalleySuccess = @@ -134,20 +153,18 @@ requestGalleySuccess = "test/resources/unit/localhost.example.com.pem" "/federation/galley/get-conversations" - runM . evalMock @Service @IO $ do - mockServiceCallReturns @IO (\_ _ _ _ -> pure (HTTP.ok200, Just "\"bar\"")) - - res <- - mock @Service @IO + runM $ do + (actualCalls, res) <- + runOutputList + . mockService HTTP.ok200 . assertNoError @ValidationError . assertNoError @DiscoveryFailure . assertNoError @ServerError - . TinyLog.discardLogs + . discardLogs . mockDiscoveryTrivial . runInputConst noClientCertSettings $ callInward request - actualCalls <- mockServiceCallCalls @IO - let expectedCall = (Galley, "/federation/get-conversations", "\"foo\"", aValidDomain) + let expectedCall = Call Galley "/federation/get-conversations" "\"foo\"" aValidDomain embed $ assertEqual "one call to galley should be made" [expectedCall] actualCalls embed $ Wai.responseStatus res @?= HTTP.status200 body <- embed $ Wai.lazyResponseBody res @@ -164,20 +181,18 @@ requestNoDomain = trPath = "/federation/brig/get-users" } - runM . evalMock @Service @IO $ do - mockServiceCallReturns @IO (\_ _ _ _ -> pure (HTTP.ok200, Just "\"bar\"")) - - res <- - runError - . mock @Service @IO + runM $ do + (actualCalls, res) <- + runOutputList @Call + . mockService HTTP.ok200 + . runError . assertNoError @ValidationError . assertNoError @DiscoveryFailure - . TinyLog.discardLogs + . discardLogs . mockDiscoveryTrivial . runInputConst noClientCertSettings $ callInward request - actualCalls <- mockServiceCallCalls @IO embed $ assertEqual "no calls to services should be made" [] actualCalls embed $ void res @?= Left NoOriginDomain @@ -191,22 +206,20 @@ requestNoCertificate = trPath = "/federation/brig/get-users" } - runM . evalMock @Service @IO $ do - mockServiceCallReturns @IO (\_ _ _ _ -> pure (HTTP.ok200, Just "\"bar\"")) - - res <- - runError - . mock @Service @IO - . assertNoError @ServerError - . assertNoError @DiscoveryFailure - . TinyLog.discardLogs - . mockDiscoveryTrivial - . runInputConst noClientCertSettings - $ callInward request - - actualCalls <- mockServiceCallCalls @IO - embed $ assertEqual "no calls to services should be made" [] actualCalls - embed $ void res @?= Left NoClientCertificate + (actualCalls, res) <- + runM + . runOutputList @Call + . mockService HTTP.ok200 + . runError + . assertNoError @ServerError + . assertNoError @DiscoveryFailure + . discardLogs + . mockDiscoveryTrivial + . runInputConst noClientCertSettings + $ callInward request + + assertEqual "no calls to services should be made" [] actualCalls + void res @?= Left NoClientCertificate testInvalidPaths :: TestTree testInvalidPaths = do @@ -244,23 +257,20 @@ testInvalidPaths = do "test/resources/unit/localhost.example.com.pem" invalidPath - runM . evalMock @Service @IO $ do - mockServiceCallReturns @IO (\_ _ _ _ -> pure (HTTP.ok200, Just "\"bar\"")) - - res <- - runError @ServerError - . mock @Service @IO - . assertNoError @ValidationError - . assertNoError @DiscoveryFailure - . TinyLog.discardLogs - . mockDiscoveryTrivial - . runInputConst noClientCertSettings - $ callInward request - - embed $ assertEqual ("Expected request with path \"" <> cs invalidPath <> "\" to fail") (Left InvalidRoute) (void res) + (actualCalls, res) <- + runM + . runOutputList @Call + . mockService HTTP.ok200 + . runError @ServerError + . assertNoError @ValidationError + . assertNoError @DiscoveryFailure + . discardLogs + . mockDiscoveryTrivial + . runInputConst noClientCertSettings + $ callInward request - actualCalls <- mockServiceCallCalls @IO - embed $ assertEqual "no calls to any service should be made" [] actualCalls + assertEqual ("Expected request with path \"" <> cs invalidPath <> "\" to fail") (Left InvalidRoute) (void res) + assertEqual "no calls to any service should be made" [] actualCalls testInvalidComponent :: TestTree testInvalidComponent = @@ -270,22 +280,20 @@ testInvalidComponent = "test/resources/unit/localhost.example.com.pem" "/federation/mast/get-users" - runM . evalMock @Service @IO $ do - mockServiceCallReturns @IO (\_ _ _ _ -> pure (HTTP.ok200, Just "\"bar\"")) - - res <- - runError @ServerError - . mock @Service @IO - . assertNoError @ValidationError - . assertNoError @DiscoveryFailure - . TinyLog.discardLogs - . mockDiscoveryTrivial - . runInputConst noClientCertSettings - $ callInward request - - embed $ void res @?= Left (UnknownComponent "mast") - actualCalls <- mockServiceCallCalls @IO - embed $ assertEqual "no calls to any service should be made" [] actualCalls + (actualCalls, res) <- + runM + . runOutputList @Call + . mockService HTTP.ok200 + . runError @ServerError + . assertNoError @ValidationError + . assertNoError @DiscoveryFailure + . discardLogs + . mockDiscoveryTrivial + . runInputConst noClientCertSettings + $ callInward request + + void res @?= Left (UnknownComponent "mast") + assertEqual "no calls to any service should be made" [] actualCalls testMethod :: TestTree testMethod = @@ -304,10 +312,10 @@ testMethod = res <- runM . runError @ServerError - . interpret @Service (\_ -> embed $ assertFailure "unexpected call to service") + . interpret @ServiceStreaming (\_ -> embed $ assertFailure "unexpected call to service") . assertNoError @ValidationError . assertNoError @DiscoveryFailure - . TinyLog.discardLogs + . discardLogs . mockDiscoveryTrivial . runInputConst noClientCertSettings $ callInward request diff --git a/services/federator/test/unit/Test/Federator/InternalServer.hs b/services/federator/test/unit/Test/Federator/InternalServer.hs index 3def6a2c2a..db3393c529 100644 --- a/services/federator/test/unit/Test/Federator/InternalServer.hs +++ b/services/federator/test/unit/Test/Federator/InternalServer.hs @@ -19,7 +19,7 @@ module Test.Federator.InternalServer (tests) where -import Data.Binary.Builder +import Data.ByteString.Builder import Data.ByteString.Conversion import Data.Default import Data.Domain @@ -36,6 +36,8 @@ import Polysemy import Polysemy.Error import Polysemy.Input import Polysemy.TinyLog +import Servant.Client.Core +import Servant.Types.SourceT import Test.Federator.Options (noClientCertSettings) import Test.Federator.Util import Test.Tasty @@ -78,7 +80,13 @@ federatedRequestSuccess = rpc @?= "get-user-by-handle" headers @?= requestHeaders toLazyByteString body @?= "\"foo\"" - pure (HTTP.status200, fromLazyByteString "\"bar\"") + pure + Response + { responseStatusCode = HTTP.ok200, + responseHeaders = mempty, + responseHttpVersion = HTTP.http20, + responseBody = source ["\"bar\""] + } res <- runM . interpretCall @@ -107,7 +115,14 @@ federatedRequestFailureAllowList = let checkRequest :: Sem (Remote ': r) a -> Sem r a checkRequest = interpret $ \case - DiscoverAndCall {} -> pure (HTTP.status200, fromLazyByteString "\"bar\"") + DiscoverAndCall {} -> + pure + Response + { responseStatusCode = HTTP.ok200, + responseHeaders = mempty, + responseHttpVersion = HTTP.http20, + responseBody = source ["\"bar\""] + } eith <- runM diff --git a/services/federator/test/unit/Test/Federator/Remote.hs b/services/federator/test/unit/Test/Federator/Remote.hs index 66689a2629..ce13842fb9 100644 --- a/services/federator/test/unit/Test/Federator/Remote.hs +++ b/services/federator/test/unit/Test/Federator/Remote.hs @@ -18,6 +18,7 @@ module Test.Federator.Remote where import Control.Exception (bracket) +import Control.Monad.Codensity import Data.Domain import Federator.Discovery import Federator.Env (TLSSettings) @@ -31,6 +32,7 @@ import Network.Wai import qualified Network.Wai.Handler.Warp as Warp import qualified Network.Wai.Handler.WarpTLS as Warp import Polysemy +import Polysemy.Embed import Polysemy.Error import Polysemy.Input import Test.Federator.Options (defRunSettings) @@ -83,6 +85,7 @@ mkTestCall tlsSettings port = . runInputConst tlsSettings . discoverLocalhost port . assertNoError @DiscoveryFailure + . runEmbedded @(Codensity IO) @IO lowerCodensity . interpretRemote $ discoverAndCall (Domain "localhost") Brig "test" [] mempty diff --git a/stack.yaml b/stack.yaml index 12671354d5..6081f77af1 100644 --- a/stack.yaml +++ b/stack.yaml @@ -161,7 +161,6 @@ extra-deps: - markov-chain-usage-model-0.0.0 - wai-predicates-1.0.0 - redis-io-1.1.0 -- polysemy-mocks-0.2.0.0 - warp-3.3.17 # Not latest as last one breaks wai-routing @@ -208,10 +207,9 @@ extra-deps: - git: https://github.com/dpwright/HaskellNet-SSL commit: ca84ef29a93eaef7673fa58056cdd8dae1568d2d # master (Sep 14, 2020) -# Fix for connection preface race condition -# https://github.com/kazu-yamamoto/http2/pull/33 +# Fix for server sending too many empty data frames - git: https://github.com/wireapp/http2 - commit: 1ee1ce432d923839dab6782410e91dc17df2a880 # preface-race branch + commit: aa3501ad58e1abbd196781fac25a84f41ec2a787 # avoid-empty-data branch # Fix in PR: https://github.com/bos/snappy/pull/7 - git: https://github.com/wireapp/snappy diff --git a/stack.yaml.lock b/stack.yaml.lock index 6da7fa2060..c9b4d4ccfc 100644 --- a/stack.yaml.lock +++ b/stack.yaml.lock @@ -447,13 +447,6 @@ packages: sha256: db61f70aa7387090c26ccca0545ffdeea0adfcf93b76d5eaf6a954c0e5a34064 original: hackage: redis-io-1.1.0 -- completed: - hackage: polysemy-mocks-0.2.0.0@sha256:ed7b4aa8ee29995d0b840ac0c131a141636ca46493b82706b7e5ec5e33b9ffa7,1441 - pantry-tree: - size: 695 - sha256: 8218e3dde278ca1f01d19009fad40f603e1b17993a519d15fe6319e3a827cc01 - original: - hackage: polysemy-mocks-0.2.0.0 - completed: hackage: warp-3.3.17@sha256:3a3ea203141d00d2244b511ee99174b8ed58fc862552755d470a25a44ce5275b,10910 pantry-tree: @@ -646,11 +639,11 @@ packages: git: https://github.com/wireapp/http2 pantry-tree: size: 52771 - sha256: dc6d3868a049d2ed38ef16ca6dd6aeb6b8e8a1e730c664ecdd243ffdb45ee750 - commit: 1ee1ce432d923839dab6782410e91dc17df2a880 + sha256: 71040c20c8e6a766b6b309c03dbc970062b15e450a63e05f8d095a87cdb5082f + commit: aa3501ad58e1abbd196781fac25a84f41ec2a787 original: git: https://github.com/wireapp/http2 - commit: 1ee1ce432d923839dab6782410e91dc17df2a880 + commit: aa3501ad58e1abbd196781fac25a84f41ec2a787 - completed: name: snappy version: 0.2.0.2