Skip to content
This repository has been archived by the owner on Dec 2, 2024. It is now read-only.

Commit

Permalink
Replace TChan with MVar
Browse files Browse the repository at this point in the history
  • Loading branch information
andreabedini committed Jun 10, 2022
1 parent 24e1f32 commit 794c6a7
Showing 1 changed file with 16 additions and 24 deletions.
40 changes: 16 additions & 24 deletions plutus-streaming/src/Plutus/Streaming.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import Cardano.Api.ChainSync.Client (ClientStIdle (SendMsgFindIntersect, SendMsg
ClientStIntersect (ClientStIntersect, recvMsgIntersectFound, recvMsgIntersectNotFound),
ClientStNext (ClientStNext, recvMsgRollBackward, recvMsgRollForward))
import Control.Concurrent.Async (ExceptionInLinkedThread (ExceptionInLinkedThread), link, withAsync)
import Control.Concurrent.STM (TChan, atomically, dupTChan, newBroadcastTChanIO, readTChan, writeTChan)
import Control.Concurrent.MVar (MVar, newEmptyMVar, putMVar, takeMVar)
import Control.Exception (Exception, SomeException (SomeException), catch, throw)
import GHC.Generics (Generic)
import Streaming (Of, Stream)
Expand All @@ -41,21 +41,17 @@ withChainSyncEventStream ::
NetworkId ->
-- | The point on the chain to start streaming from
ChainPoint ->
-- | Stream consumer
-- | The stream consumer
(Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r -> IO b) ->
IO b
withChainSyncEventStream socketPath networkId point consumer = do
-- The chain-sync client runs in a different thread and it will send us
-- block through this channel.
-- The chain-sync client runs in a different thread passing the blocks it
-- receives to the stream consumer through a MVar. The chain-sync client
-- thread and the stream consumer will each block on each other and stay
-- in lockstep.
nextBlockVar <- newEmptyMVar

-- By using newBroadcastTChan, messages can be garbage collected after
-- clients have seen them, preventing pile up. The only way to read a
-- broadcast channel is to duplicate it with dupTChan. See note at
-- `newBroadcastTChan`.
chan <- newBroadcastTChanIO
readerChannel <- atomically $ dupTChan chan

let client = chainSyncStreamingClient point chan
let client = chainSyncStreamingClient point nextBlockVar

localNodeClientProtocols =
LocalNodeClientProtocols
Expand All @@ -78,7 +74,7 @@ withChainSyncEventStream socketPath networkId point consumer = do
-- Make sure all exceptions in the client thread are passed to the consumer thread
link a
-- Run the consumer
consumer $ S.repeatM $ atomically (readTChan readerChannel)
consumer $ S.repeatM $ takeMVar nextBlockVar
-- Let's rethrow exceptions from the client thread unwrapped, so that the
-- consumer does not have to know anything about async
`catch` \(ExceptionInLinkedThread _ (SomeException e)) -> throw e
Expand All @@ -87,26 +83,22 @@ withChainSyncEventStream socketPath networkId point consumer = do
-- and runs the chain-sync mini-protocol. This client is fire-and-forget
-- and does not require any control.
--
-- Blocks obtained from the chain-sync mini-protocol are passed to a
-- consumer through a channel.
--
-- If the starting point is such that an intersection cannot be found, this
-- client will throw a NoIntersectionFound exception.
chainSyncStreamingClient ::
ChainPoint ->
TChan (ChainSyncEvent e) ->
MVar (ChainSyncEvent e) ->
ChainSyncClient e ChainPoint ChainTip IO ()
chainSyncStreamingClient point chan =
chainSyncStreamingClient point nextBlockVar =
ChainSyncClient $ pure $ SendMsgFindIntersect [point] onIntersect
where
onIntersect =
ClientStIntersect
{ recvMsgIntersectFound = \_ _ ->
ChainSyncClient sendRequestNext,
recvMsgIntersectNotFound = \_ ->
ChainSyncClient $
-- There is nothing we can do here
throw NoIntersectionFound
recvMsgIntersectNotFound =
-- There is nothing we can do here
throw NoIntersectionFound
}

sendRequestNext =
Expand All @@ -116,10 +108,10 @@ chainSyncStreamingClient point chan =
ClientStNext
{ recvMsgRollForward = \bim ct ->
ChainSyncClient $ do
atomically $ writeTChan chan (RollForward bim ct)
putMVar nextBlockVar (RollForward bim ct)
sendRequestNext,
recvMsgRollBackward = \cp ct ->
ChainSyncClient $ do
atomically $ writeTChan chan (RollBackward cp ct)
putMVar nextBlockVar (RollBackward cp ct)
sendRequestNext
}

0 comments on commit 794c6a7

Please sign in to comment.