Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

workaround of sync for SIO #152

Merged
merged 2 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions Network/HTTP2/Client/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ run cconf@ClientConfig{..} conf client = do
(strm, moutobj) <- makeStream ctx scheme authority req
case moutobj of
Nothing -> return ()
Just outobj -> sendRequest conf ctx strm outobj
Just outobj -> sendRequest conf ctx strm outobj False
rsp <- getResponse strm
x <- processResponse rsp
adjustRxWindow ctx strm
Expand Down Expand Up @@ -115,7 +115,7 @@ runIO cconf@ClientConfig{..} conf@Config{..} action = do
(strm, moutobj) <- makeStream ctx scheme authority req
case moutobj of
Nothing -> return ()
Just outobj -> sendRequest conf ctx strm outobj
Just outobj -> sendRequest conf ctx strm outobj True
return (streamNumber strm, strm)
get = getResponse
create = openOddStreamWait ctx
Expand Down Expand Up @@ -208,8 +208,8 @@ makeStream ctx@Context{..} scheme auth (Request req) = do
(_sid, newstrm) <- openOddStreamWait ctx
return (newstrm, Just req')

sendRequest :: Config -> Context -> Stream -> OutObj -> IO ()
sendRequest Config{..} ctx@Context{..} strm OutObj{..} = do
sendRequest :: Config -> Context -> Stream -> OutObj -> Bool -> IO ()
sendRequest Config{..} ctx@Context{..} strm OutObj{..} io = do
let sid = streamNumber strm
(mnext, mtbq) <- case outObjBody of
OutBodyNone -> return (Nothing, Nothing)
Expand All @@ -230,17 +230,22 @@ sendRequest Config{..} ctx@Context{..} strm OutObj{..} = do
let next = nextForStreaming q
return (Just next, Just q)
let ot = OHeader outObjHeaders mnext outObjTrailers
(pop, out) <- makeOutput strm ot
atomically $ do
if io
then do
let out = makeOutputIO ctx strm ot
pushOutput sid out
else do
(pop, out) <- makeOutput strm ot
pushOutput sid out
lc <- newLoopCheck strm mtbq
forkManaged threadManager label $ syncWithSender' ctx pop lc
where
label = "H2 request sender for stream " ++ show (streamNumber strm)
pushOutput sid out = atomically $ do
sidOK <- readTVar outputQStreamID
check (sidOK == sid)
writeTVar outputQStreamID (sid + 2)
enqueueOutputSTM outputQ out
lc <- newLoopCheck strm mtbq
forkManaged threadManager label $
syncWithSender' ctx pop lc
where
label = "H2 request sender for stream " ++ show (streamNumber strm)

sendStreaming
:: Context
Expand Down
10 changes: 3 additions & 7 deletions Network/HTTP2/H2/Sender.hs
Original file line number Diff line number Diff line change
Expand Up @@ -152,18 +152,14 @@ frameSender
else case otyp of
OHeader hdr mnext tlrmkr -> do
(off', mout') <- outputHeader strm hdr mnext tlrmkr sync off
case mout' of
Nothing -> sync Done
Just out' -> sync $ Cont out'
sync mout'
return off'
_ -> do
sws <- getStreamWindowSize strm
cws <- getConnectionWindowSize ctx -- not 0
let lim = min cws sws
(off', mout') <- output out off lim
case mout' of
Nothing -> sync Done
Just out' -> sync $ Cont out'
sync mout'
return off'

resetStream :: Stream -> ErrorCode -> E.SomeException -> IO ()
Expand All @@ -178,7 +174,7 @@ frameSender
-> [Header]
-> Maybe DynaNext
-> TrailersMaker
-> (Sync -> IO ())
-> (Maybe Output -> IO ())
-> Offset
-> IO (Offset, Maybe Output)
outputHeader strm hdr mnext tlrmkr sync off0 = do
Expand Down
44 changes: 35 additions & 9 deletions Network/HTTP2/H2/Sync.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ module Network.HTTP2.H2.Sync (
syncWithSender,
syncWithSender',
makeOutput,
makeOutputIO,
enqueueOutputSIO,
) where

import Control.Concurrent
Expand All @@ -25,26 +27,50 @@ syncWithSender
-> LoopCheck
-> IO ()
syncWithSender ctx@Context{..} strm otyp lc = do
(var, out) <- makeOutput strm otyp
(pop, out) <- makeOutput strm otyp
enqueueOutput outputQ out
syncWithSender' ctx var lc
syncWithSender' ctx pop lc

makeOutput :: Stream -> OutputType -> IO (MVar Sync, Output)
makeOutput :: Stream -> OutputType -> IO (IO Sync, Output)
makeOutput strm otyp = do
var <- newEmptyMVar
let out =
let push mout = case mout of
Nothing -> putMVar var Done
Just ot -> putMVar var $ Cont ot
pop = takeMVar var
out =
Output
{ outputStream = strm
, outputType = otyp
, outputSync = putMVar var
, outputSync = push
}
return (var, out)
return (pop, out)

syncWithSender' :: Context -> MVar Sync -> LoopCheck -> IO ()
syncWithSender' Context{..} var lc = loop
makeOutputIO :: Context -> Stream -> OutputType -> Output
makeOutputIO Context{..} strm otyp = out
where
push mout = case mout of
Nothing -> return ()
-- Sender enqueues output again ignoring
-- the stream TX window.
Just ot -> enqueueOutput outputQ ot
out =
Output
{ outputStream = strm
, outputType = otyp
, outputSync = push
}

enqueueOutputSIO :: Context -> Stream -> OutputType -> IO ()
enqueueOutputSIO ctx@Context{..} strm otyp = do
let out = makeOutputIO ctx strm otyp
enqueueOutput outputQ out

syncWithSender' :: Context -> IO Sync -> LoopCheck -> IO ()
syncWithSender' Context{..} pop lc = loop
where
loop = do
s <- takeMVar var
s <- pop
case s of
Done -> return ()
Cont newout -> do
Expand Down
2 changes: 1 addition & 1 deletion Network/HTTP2/H2/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ instance Show Stream where
data Output = Output
{ outputStream :: Stream
, outputType :: OutputType
, outputSync :: Sync -> IO ()
, outputSync :: Maybe Output -> IO ()
}

data OutputType
Expand Down
5 changes: 2 additions & 3 deletions Network/HTTP2/Server/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ runIO sconf conf@Config{..} action = do
when ok $ do
inpQ <- newTQueueIO
let lnch _ strm inpObj = atomically $ writeTQueue inpQ (strm, inpObj)
ctx@Context{..} <- setup sconf conf lnch
ctx <- setup sconf conf lnch
let get = do
(strm, inpObj) <- atomically $ readTQueue inpQ
return (strm, Request inpObj)
Expand All @@ -85,8 +85,7 @@ runIO sconf conf@Config{..} action = do
OutBodyBuilder builder -> do
let next = fillBuilderBodyGetNext builder
otyp = OHeader outObjHeaders (Just next) outObjTrailers
(_, out) <- makeOutput strm otyp
enqueueOutput outputQ out
enqueueOutputSIO ctx strm otyp
_ -> error "Response other than OutBodyBuilder is not supported"
serverIO =
ServerIO
Expand Down
Loading