Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/3-bug-fixes/fix-cargohold-streaming
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix an issue with remote asset streaming
44 changes: 33 additions & 11 deletions services/cargohold/src/CargoHold/Federation.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@ module CargoHold.Federation where
import CargoHold.App
import CargoHold.Options
import Control.Error
import Control.Exception (throw)
import Control.Lens
import Control.Monad.Codensity
import Data.Id
import Data.Qualified
import Imports hiding (head)
import Servant.API
import Servant.Types.SourceT
import Wire.API.Asset
import Wire.API.Federation.API
import Wire.API.Federation.API.Cargohold
Expand Down Expand Up @@ -60,22 +63,41 @@ downloadRemoteAsset usr rkey tok = do
fmap gaAvailable . executeFederated rkey $
getAsset clientRoutes ga
if exists
then do
fmap (Just . toSourceIO) . executeFederated rkey $
streamAsset clientRoutes ga
then Just <$> executeFederatedStreaming rkey (toSourceIO <$> streamAsset clientRoutes ga)
else pure Nothing

executeFederated :: Remote x -> FederatorClient 'Cargohold a -> Handler a
executeFederated remote c = do
mkFederatorClientEnv :: Remote x -> Handler FederatorClientEnv
mkFederatorClientEnv remote = do
loc <- view localUnit
endpoint <-
view (options . optFederator)
>>= maybe (throwE federationNotConfigured) pure
let env =
FederatorClientEnv
{ ceOriginDomain = tDomain loc,
ceTargetDomain = tDomain remote,
ceFederator = endpoint
}
pure
FederatorClientEnv
{ ceOriginDomain = tDomain loc,
ceTargetDomain = tDomain remote,
ceFederator = endpoint
}

executeFederated :: Remote x -> FederatorClient 'Cargohold a -> Handler a
executeFederated remote c = do
env <- mkFederatorClientEnv remote
liftIO (runFederatorClient @'Cargohold env c)
>>= either (throwE . federationErrorToWai . FederationCallFailure) pure

executeFederatedStreaming ::
Remote x ->
FederatorClient 'Cargohold (SourceIO ByteString) ->
Handler (SourceIO ByteString)
executeFederatedStreaming remote c = do
env <- mkFederatorClientEnv remote
-- To clean up resources correctly, we exploit the Codensity wrapper around
-- StepT to embed the result of @runFederatorClientToCodensity@. This works, but
-- using this within a Servant handler has the effect of delaying exceptions to
-- the point where response streaming has already started (i.e. we have already
-- committed to a successful response).
pure $
SourceT $ \k ->
runCodensity
(runFederatorClientToCodensity @'Cargohold env c)
(either throw (flip unSourceT k))
16 changes: 11 additions & 5 deletions services/cargohold/test/integration/API.hs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import qualified Codec.MIME.Type as MIME
import Control.Exception (throw)
import Control.Lens hiding (sets)
import qualified Data.Aeson as Aeson
import Data.ByteString.Builder
import qualified Data.ByteString.Char8 as C8
import Data.ByteString.Conversion
import Data.Domain
Expand Down Expand Up @@ -66,7 +67,12 @@ tests s =
"remote"
[ test s "remote download wrong domain" testRemoteDownloadWrongDomain,
test s "remote download no asset" testRemoteDownloadNoAsset,
test s "remote download" testRemoteDownload
test s "remote download" (testRemoteDownload "asset content"),
test s "large remote download" $
testRemoteDownload
( toLazyByteString
(mconcat (replicate 20000 (byteString "hello world\n")))
)
]
]

Expand Down Expand Up @@ -288,8 +294,8 @@ testRemoteDownloadNoAsset = do
}
]

testRemoteDownload :: TestM ()
testRemoteDownload = do
testRemoteDownload :: LByteString -> TestM ()
testRemoteDownload assetContent = do
assetId <- liftIO $ Id <$> nextRandom
uid <- liftIO $ Id <$> nextRandom

Expand All @@ -298,11 +304,11 @@ testRemoteDownload = do
respond req
| frRPC req == "get-asset" =
pure ("application" // "json", Aeson.encode (GetAssetResponse True))
| otherwise = pure ("application" // "octet-stream", "asset content")
| otherwise = pure ("application" // "octet-stream", assetContent)
(_, reqs) <- withMockFederator respond $ do
downloadAsset uid qkey () !!! do
const 200 === statusCode
const (Just "asset content") === responseBody
const (Just assetContent) === responseBody

localDomain <- viewFederationDomain
let ga = Aeson.encode (GetAsset uid key Nothing)
Expand Down