Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cabal.project
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ source-repository-package
source-repository-package
type: git
location: https://github.com/wireapp/http2
tag: 1ee1ce432d923839dab6782410e91dc17df2a880
tag: aa3501ad58e1abbd196781fac25a84f41ec2a787

source-repository-package
type: git
Expand Down
1 change: 0 additions & 1 deletion cabal.project.freeze
Original file line number Diff line number Diff line change
Expand Up @@ -1691,7 +1691,6 @@ constraints: any.AC-Angle ==1.0,
any.polyparse ==1.13,
any.polysemy ==1.7.0.0,
any.polysemy-check ==0.8.1.0,
any.polysemy-mocks ==0.2.0.0,
any.polysemy-plugin ==0.4.2.0,
any.pooled-io ==0.0.2.2,
any.port-utils ==0.2.1.0,
Expand Down
1 change: 1 addition & 0 deletions changelog.d/6-federation/federator-streaming
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Make federator capable of streaming responses
1 change: 1 addition & 0 deletions libs/wire-api-federation/package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ dependencies:
- http-types
- http2
- imports
- kan-extensions
- lifted-base
- metrics-wai
- mtl
Expand Down
204 changes: 134 additions & 70 deletions libs/wire-api-federation/src/Wire/API/Federation/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@ module Wire.API.Federation.Client
( FederatorClientEnv (..),
FederatorClient,
runFederatorClient,
runFederatorClientToCodensity,
performHTTP2Request,
withHTTP2Request,
streamingResponseStrictBody,
headersFromTable,
)
where

import qualified Control.Exception as E
import Control.Monad.Catch
import Control.Monad.Codensity
import Control.Monad.Except
import qualified Data.Aeson as Aeson
import qualified Data.ByteString as BS
Expand All @@ -52,6 +56,7 @@ import Network.TLS as TLS
import qualified Network.Wai.Utilities.Error as Wai
import Servant.Client
import Servant.Client.Core
import Servant.Types.SourceT
import qualified System.TimeManager
import Util.Options (Endpoint (..))
import Wire.API.Federation.Component
Expand All @@ -65,7 +70,7 @@ data FederatorClientEnv = FederatorClientEnv
}

newtype FederatorClient (c :: Component) a = FederatorClient
{unFederatorClient :: ReaderT FederatorClientEnv (ExceptT FederatorClientError IO) a}
{unFederatorClient :: ReaderT FederatorClientEnv (ExceptT FederatorClientError (Codensity IO)) a}
deriving newtype
( Functor,
Applicative,
Expand All @@ -75,6 +80,9 @@ newtype FederatorClient (c :: Component) a = FederatorClient
MonadIO
)

liftCodensity :: Codensity IO a -> FederatorClient c a
liftCodensity = FederatorClient . lift . lift

headersFromTable :: HTTP2.HeaderTable -> [HTTP.Header]
headersFromTable (headerList, _) = flip map headerList $ \(token, headerValue) ->
(HTTP2.tokenKey token, headerValue)
Expand All @@ -90,93 +98,136 @@ performHTTP2Request ::
HTTP2.Request ->
ByteString ->
Int ->
IO (Either FederatorClientHTTP2Error (HTTP.Status, [HTTP.Header], Builder))
performHTTP2Request mtlsConfig req hostname port = do
let drainResponse resp = go mempty
where
go acc = do
chunk <- HTTP2.getResponseBodyChunk resp
if BS.null chunk
then pure acc
else go (acc <> byteString chunk)
IO (Either FederatorClientHTTP2Error (ResponseF Builder))
performHTTP2Request mtlsConfig req hostname port = try $ do
withHTTP2Request mtlsConfig req hostname port $ \resp -> do
b <-
fmap (either (const mempty) id)
. runExceptT
. runSourceT
. responseBody
$ resp
pure $ resp $> foldMap byteString b

withHTTP2Request ::
Maybe TLS.ClientParams ->
HTTP2.Request ->
ByteString ->
Int ->
(StreamingResponse -> IO a) ->
IO a
withHTTP2Request mtlsConfig req hostname port k = do
let clientConfig =
HTTP2.ClientConfig
"https"
hostname
{- cacheLimit: -} 20
flip
E.catches
[ -- catch FederatorClientHTTP2Error (e.g. connection and TLS errors)
E.Handler (pure . Left),
-- catch HTTP2 exceptions
E.Handler (pure . Left . FederatorClientHTTP2Exception)
]
$ bracket (connectSocket hostname port) NS.close $ \sock -> do
let withHTTP2Config k = case mtlsConfig of
Nothing -> bracket (HTTP2.allocSimpleConfig sock 4096) HTTP2.freeSimpleConfig k
E.handle (E.throw . FederatorClientHTTP2Exception) $
bracket (connectSocket hostname port) NS.close $ \sock -> do
let withHTTP2Config k' = case mtlsConfig of
Nothing -> bracket (HTTP2.allocSimpleConfig sock 4096) HTTP2.freeSimpleConfig k'
-- FUTUREWORK(federation): Use openssl
Just tlsConfig -> do
ctx <- E.handle (E.throw . FederatorClientTLSException) $ do
ctx <- TLS.contextNew sock tlsConfig
TLS.handshake ctx
pure ctx
bracket (allocTLSConfig ctx 4096) freeTLSConfig k
withHTTP2Config $ \conf ->
HTTP2.run clientConfig conf $ \sendRequest -> do
bracket (allocTLSConfig ctx 4096) freeTLSConfig k'
withHTTP2Config $ \conf -> do
HTTP2.run clientConfig conf $ \sendRequest ->
sendRequest req $ \resp -> do
result <- drainResponse resp
let headers = headersFromTable (HTTP2.responseHeaders resp)
pure $ case HTTP2.responseStatus resp of
Nothing -> Left FederatorClientNoStatusCode
Just status -> Right (status, headers, result)
result = fromAction BS.null (HTTP2.getResponseBodyChunk resp)
case HTTP2.responseStatus resp of
Nothing -> E.throw FederatorClientNoStatusCode
Just status ->
k
Response
{ responseStatusCode = status,
responseHeaders = Seq.fromList headers,
responseHttpVersion = HTTP.http20,
responseBody = result
}

instance KnownComponent c => RunClient (FederatorClient c) where
runRequestAcceptStatus expectedStatuses req = do
env <- ask
let baseUrlPath =
HTTP.encodePathSegments
[ "rpc",
domainText (ceTargetDomain env),
componentName (componentVal @c)
]
let path = baseUrlPath <> requestPath req
body <- case requestBody req of
Just (RequestBodyLBS lbs, _) -> pure lbs
Just (RequestBodyBS bs, _) -> pure (LBS.fromStrict bs)
Just (RequestBodySource _, _) ->
throwError FederatorClientStreamingNotSupported
Nothing -> pure mempty
let req' =
HTTP2.requestBuilder
(requestMethod req)
(LBS.toStrict (toLazyByteString path))
(toList (requestHeaders req) <> [(originDomainHeaderName, toByteString' (ceOriginDomain env))])
(lazyByteString body)
let Endpoint (Text.encodeUtf8 -> hostname) (fromIntegral -> port) = ceFederator env
eresp <- liftIO $ performHTTP2Request Nothing req' hostname port
case eresp of
Left err -> throwError (FederatorClientHTTP2Error err)
Right (status, headers, result)
| maybe (HTTP.statusIsSuccessful status) (elem status) expectedStatuses ->
pure $
Response
{ responseStatusCode = status,
responseHeaders = Seq.fromList headers,
responseHttpVersion = HTTP.http20,
responseBody = toLazyByteString result
}
| otherwise ->
throwError $
FederatorClientError
( mkFailureResponse
status
(ceTargetDomain env)
(toLazyByteString (requestPath req))
(toLazyByteString result)
)
let successfulStatus status =
maybe
(HTTP.statusIsSuccessful status)
(elem status)
expectedStatuses
withHTTP2StreamingRequest successfulStatus req $ \resp -> do
bdy <-
fmap (either (const mempty) (toLazyByteString . foldMap byteString))
. runExceptT
. runSourceT
. responseBody
$ resp
pure $ resp $> bdy

throwClientError = throwError . FederatorClientServantError

instance KnownComponent c => RunStreamingClient (FederatorClient c) where
withStreamingRequest = withHTTP2StreamingRequest HTTP.statusIsSuccessful

streamingResponseStrictBody :: StreamingResponse -> IO Builder
streamingResponseStrictBody resp =
fmap (either stringUtf8 (foldMap byteString))
. runExceptT
. runSourceT
. responseBody
$ resp

withHTTP2StreamingRequest ::
forall c a.
KnownComponent c =>
(HTTP.Status -> Bool) ->
Request ->
(StreamingResponse -> IO a) ->
FederatorClient c a
withHTTP2StreamingRequest successfulStatus req handleResponse = do
env <- ask
let baseUrlPath =
HTTP.encodePathSegments
[ "rpc",
domainText (ceTargetDomain env),
componentName (componentVal @c)
]
let path = baseUrlPath <> requestPath req
body <- case requestBody req of
Just (RequestBodyLBS lbs, _) -> pure lbs
Just (RequestBodyBS bs, _) -> pure (LBS.fromStrict bs)
Just (RequestBodySource _, _) ->
throwError FederatorClientStreamingNotSupported
Nothing -> pure mempty
let req' =
HTTP2.requestBuilder
(requestMethod req)
(LBS.toStrict (toLazyByteString path))
(toList (requestHeaders req) <> [(originDomainHeaderName, toByteString' (ceOriginDomain env))])
(lazyByteString body)
let Endpoint (Text.encodeUtf8 -> hostname) (fromIntegral -> port) = ceFederator env
resp <-
(either throwError pure =<<) . liftCodensity $
Codensity $ \k ->
E.catch
(withHTTP2Request Nothing req' hostname port (k . Right))
(k . Left . FederatorClientHTTP2Error)

if successfulStatus (responseStatusCode resp)
then liftIO $ handleResponse resp
else do
-- in case of an error status code, read the whole body to construct the error
bdy <- liftIO $ streamingResponseStrictBody resp
throwError $
FederatorClientError
( mkFailureResponse
(responseStatusCode resp)
(ceTargetDomain env)
(toLazyByteString (requestPath req))
(toLazyByteString bdy)
)

mkFailureResponse :: HTTP.Status -> Domain -> LByteString -> LByteString -> Wai.Error
mkFailureResponse status domain path body
-- If the outward federator fails with 403, that means that there was an
Expand Down Expand Up @@ -211,12 +262,25 @@ mkFailureResponse status domain path body
"unknown-federation-error"
(LText.decodeUtf8With Text.lenientDecode body)

-- | Run federator client synchronously.
runFederatorClient ::
KnownComponent c =>
FederatorClientEnv ->
FederatorClient c a ->
IO (Either FederatorClientError a)
runFederatorClient env action = runExceptT (runReaderT (unFederatorClient action) env)
runFederatorClient env =
lowerCodensity
. runFederatorClientToCodensity env

runFederatorClientToCodensity ::
KnownComponent c =>
FederatorClientEnv ->
FederatorClient c a ->
Codensity IO (Either FederatorClientError a)
runFederatorClientToCodensity env =
runExceptT
. flip runReaderT env
. unFederatorClient

freeTLSConfig :: HTTP2.Config -> IO ()
freeTLSConfig cfg = free (HTTP2.confWriteBuffer cfg)
Expand Down
4 changes: 3 additions & 1 deletion libs/wire-api-federation/wire-api-federation.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ cabal-version: 1.12
--
-- see: https://github.com/sol/hpack
--
-- hash: 621c254076cf520b525269ca4fc550df57f410aea52a288f6cb68bd2d6f1ada3
-- hash: f62744549c34b54e9900b31c33a4ebad5c51ac13ec44010ad2da6c6ca67fdc29

name: wire-api-federation
version: 0.1.0
Expand Down Expand Up @@ -51,6 +51,7 @@ library
, http-types
, http2
, imports
, kan-extensions
, lifted-base
, metrics-wai
, mtl
Expand Down Expand Up @@ -110,6 +111,7 @@ test-suite spec
, http-types
, http2
, imports
, kan-extensions
, lifted-base
, metrics-wai
, mtl
Expand Down
Loading