diff --git a/changelog.d/3-bug-fixes/fix-cargohold-streaming b/changelog.d/3-bug-fixes/fix-cargohold-streaming new file mode 100644 index 0000000000..53c7e53975 --- /dev/null +++ b/changelog.d/3-bug-fixes/fix-cargohold-streaming @@ -0,0 +1 @@ +Fix an issue with remote asset streaming diff --git a/services/cargohold/src/CargoHold/Federation.hs b/services/cargohold/src/CargoHold/Federation.hs index 352e5fc043..d608bb385e 100644 --- a/services/cargohold/src/CargoHold/Federation.hs +++ b/services/cargohold/src/CargoHold/Federation.hs @@ -20,11 +20,14 @@ module CargoHold.Federation where import CargoHold.App import CargoHold.Options import Control.Error +import Control.Exception (throw) import Control.Lens +import Control.Monad.Codensity import Data.Id import Data.Qualified import Imports hiding (head) import Servant.API +import Servant.Types.SourceT import Wire.API.Asset import Wire.API.Federation.API import Wire.API.Federation.API.Cargohold @@ -60,22 +63,41 @@ downloadRemoteAsset usr rkey tok = do fmap gaAvailable . executeFederated rkey $ getAsset clientRoutes ga if exists - then do - fmap (Just . toSourceIO) . executeFederated rkey $ - streamAsset clientRoutes ga + then Just <$> executeFederatedStreaming rkey (toSourceIO <$> streamAsset clientRoutes ga) else pure Nothing -executeFederated :: Remote x -> FederatorClient 'Cargohold a -> Handler a -executeFederated remote c = do +mkFederatorClientEnv :: Remote x -> Handler FederatorClientEnv +mkFederatorClientEnv remote = do loc <- view localUnit endpoint <- view (options . optFederator) >>= maybe (throwE federationNotConfigured) pure - let env = - FederatorClientEnv - { ceOriginDomain = tDomain loc, - ceTargetDomain = tDomain remote, - ceFederator = endpoint - } + pure + FederatorClientEnv + { ceOriginDomain = tDomain loc, + ceTargetDomain = tDomain remote, + ceFederator = endpoint + } + +executeFederated :: Remote x -> FederatorClient 'Cargohold a -> Handler a +executeFederated remote c = do + env <- mkFederatorClientEnv remote liftIO (runFederatorClient @'Cargohold env c) >>= either (throwE . federationErrorToWai . FederationCallFailure) pure + +executeFederatedStreaming :: + Remote x -> + FederatorClient 'Cargohold (SourceIO ByteString) -> + Handler (SourceIO ByteString) +executeFederatedStreaming remote c = do + env <- mkFederatorClientEnv remote + -- To clean up resources correctly, we exploit the Codensity wrapper around + -- StepT to embed the result of @runFederatorClientToCodensity@. This works, but + -- using this within a Servant handler has the effect of delaying exceptions to + -- the point where response streaming has already started (i.e. we have already + -- committed to a successful response). + pure $ + SourceT $ \k -> + runCodensity + (runFederatorClientToCodensity @'Cargohold env c) + (either throw (flip unSourceT k)) diff --git a/services/cargohold/test/integration/API.hs b/services/cargohold/test/integration/API.hs index a478732103..274f96f6c3 100644 --- a/services/cargohold/test/integration/API.hs +++ b/services/cargohold/test/integration/API.hs @@ -29,6 +29,7 @@ import qualified Codec.MIME.Type as MIME import Control.Exception (throw) import Control.Lens hiding (sets) import qualified Data.Aeson as Aeson +import Data.ByteString.Builder import qualified Data.ByteString.Char8 as C8 import Data.ByteString.Conversion import Data.Domain @@ -66,7 +67,12 @@ tests s = "remote" [ test s "remote download wrong domain" testRemoteDownloadWrongDomain, test s "remote download no asset" testRemoteDownloadNoAsset, - test s "remote download" testRemoteDownload + test s "remote download" (testRemoteDownload "asset content"), + test s "large remote download" $ + testRemoteDownload + ( toLazyByteString + (mconcat (replicate 20000 (byteString "hello world\n"))) + ) ] ] @@ -288,8 +294,8 @@ testRemoteDownloadNoAsset = do } ] -testRemoteDownload :: TestM () -testRemoteDownload = do +testRemoteDownload :: LByteString -> TestM () +testRemoteDownload assetContent = do assetId <- liftIO $ Id <$> nextRandom uid <- liftIO $ Id <$> nextRandom @@ -298,11 +304,11 @@ testRemoteDownload = do respond req | frRPC req == "get-asset" = pure ("application" // "json", Aeson.encode (GetAssetResponse True)) - | otherwise = pure ("application" // "octet-stream", "asset content") + | otherwise = pure ("application" // "octet-stream", assetContent) (_, reqs) <- withMockFederator respond $ do downloadAsset uid qkey () !!! do const 200 === statusCode - const (Just "asset content") === responseBody + const (Just assetContent) === responseBody localDomain <- viewFederationDomain let ga = Aeson.encode (GetAsset uid key Nothing)