From 606a5a206854828e507226d71169065b91603f43 Mon Sep 17 00:00:00 2001 From: Andrea Bedini Date: Thu, 9 Jun 2022 14:51:16 +0800 Subject: [PATCH 1/4] Remove unreachable code. I had moved the place where the exception is thrown and forgot to remove the previous code. --- plutus-streaming/src/Plutus/Streaming.hs | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/plutus-streaming/src/Plutus/Streaming.hs b/plutus-streaming/src/Plutus/Streaming.hs index fb07a454d7..2f69b50bb1 100644 --- a/plutus-streaming/src/Plutus/Streaming.hs +++ b/plutus-streaming/src/Plutus/Streaming.hs @@ -74,14 +74,7 @@ withChainSyncEventStream socketPath networkId point consumer = do -- FIXME this comes from the config file but Cardano.Api does not expose readNetworkConfig! epochSlots = EpochSlots 40 - clientThread = do - connectToLocalNode connectInfo localNodeClientProtocols - -- the only reason connectToLocalNode can terminate successfully is if it - -- doesn't find an intersection, we report that case to the - -- consumer as an exception - throw NoIntersectionFound - - withAsync clientThread $ \a -> do + withAsync (connectToLocalNode connectInfo localNodeClientProtocols) $ \a -> do -- Make sure all exceptions in the client thread are passed to the consumer thread link a -- Run the consumer From d3bcbe3213e79931fd0ae0529107a48d39c9e6e7 Mon Sep 17 00:00:00 2001 From: Andrea Bedini Date: Wed, 8 Jun 2022 13:32:59 +0800 Subject: [PATCH 2/4] Replace TChan with MVar --- plutus-streaming/src/Plutus/Streaming.hs | 40 ++++++++++-------------- 1 file changed, 16 insertions(+), 24 deletions(-) diff --git a/plutus-streaming/src/Plutus/Streaming.hs b/plutus-streaming/src/Plutus/Streaming.hs index 2f69b50bb1..39a12c9102 100644 --- a/plutus-streaming/src/Plutus/Streaming.hs +++ b/plutus-streaming/src/Plutus/Streaming.hs @@ -15,7 +15,7 @@ import Cardano.Api.ChainSync.Client (ClientStIdle (SendMsgFindIntersect, SendMsg ClientStIntersect (ClientStIntersect, recvMsgIntersectFound, recvMsgIntersectNotFound), ClientStNext (ClientStNext, recvMsgRollBackward, recvMsgRollForward)) import Control.Concurrent.Async (ExceptionInLinkedThread (ExceptionInLinkedThread), link, withAsync) -import Control.Concurrent.STM (TChan, atomically, dupTChan, newBroadcastTChanIO, readTChan, writeTChan) +import Control.Concurrent.MVar (MVar, newEmptyMVar, putMVar, takeMVar) import Control.Exception (Exception, SomeException (SomeException), catch, throw) import GHC.Generics (Generic) import Streaming (Of, Stream) @@ -41,21 +41,17 @@ withChainSyncEventStream :: NetworkId -> -- | The point on the chain to start streaming from ChainPoint -> - -- | Stream consumer + -- | The stream consumer (Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r -> IO b) -> IO b withChainSyncEventStream socketPath networkId point consumer = do - -- The chain-sync client runs in a different thread and it will send us - -- block through this channel. + -- The chain-sync client runs in a different thread passing the blocks it + -- receives to the stream consumer through a MVar. The chain-sync client + -- thread and the stream consumer will each block on each other and stay + -- in lockstep. + nextBlockVar <- newEmptyMVar - -- By using newBroadcastTChan, messages can be garbage collected after - -- clients have seen them, preventing pile up. The only way to read a - -- broadcast channel is to duplicate it with dupTChan. See note at - -- `newBroadcastTChan`. - chan <- newBroadcastTChanIO - readerChannel <- atomically $ dupTChan chan - - let client = chainSyncStreamingClient point chan + let client = chainSyncStreamingClient point nextBlockVar localNodeClientProtocols = LocalNodeClientProtocols @@ -78,7 +74,7 @@ withChainSyncEventStream socketPath networkId point consumer = do -- Make sure all exceptions in the client thread are passed to the consumer thread link a -- Run the consumer - consumer $ S.repeatM $ atomically (readTChan readerChannel) + consumer $ S.repeatM $ takeMVar nextBlockVar -- Let's rethrow exceptions from the client thread unwrapped, so that the -- consumer does not have to know anything about async `catch` \(ExceptionInLinkedThread _ (SomeException e)) -> throw e @@ -87,26 +83,22 @@ withChainSyncEventStream socketPath networkId point consumer = do -- and runs the chain-sync mini-protocol. This client is fire-and-forget -- and does not require any control. -- --- Blocks obtained from the chain-sync mini-protocol are passed to a --- consumer through a channel. --- -- If the starting point is such that an intersection cannot be found, this -- client will throw a NoIntersectionFound exception. chainSyncStreamingClient :: ChainPoint -> - TChan (ChainSyncEvent e) -> + MVar (ChainSyncEvent e) -> ChainSyncClient e ChainPoint ChainTip IO () -chainSyncStreamingClient point chan = +chainSyncStreamingClient point nextBlockVar = ChainSyncClient $ pure $ SendMsgFindIntersect [point] onIntersect where onIntersect = ClientStIntersect { recvMsgIntersectFound = \_ _ -> ChainSyncClient sendRequestNext, - recvMsgIntersectNotFound = \_ -> - ChainSyncClient $ - -- There is nothing we can do here - throw NoIntersectionFound + recvMsgIntersectNotFound = + -- There is nothing we can do here + throw NoIntersectionFound } sendRequestNext = @@ -116,10 +108,10 @@ chainSyncStreamingClient point chan = ClientStNext { recvMsgRollForward = \bim ct -> ChainSyncClient $ do - atomically $ writeTChan chan (RollForward bim ct) + putMVar nextBlockVar (RollForward bim ct) sendRequestNext, recvMsgRollBackward = \cp ct -> ChainSyncClient $ do - atomically $ writeTChan chan (RollBackward cp ct) + putMVar nextBlockVar (RollBackward cp ct) sendRequestNext } From 31194d4e99faa57d6f7388597d53cb7311b7c04c Mon Sep 17 00:00:00 2001 From: Andrea Bedini Date: Fri, 10 Jun 2022 13:03:23 +0800 Subject: [PATCH 3/4] Add comment about MVar choice --- plutus-streaming/src/Plutus/Streaming.hs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/plutus-streaming/src/Plutus/Streaming.hs b/plutus-streaming/src/Plutus/Streaming.hs index 39a12c9102..6ac5a890e5 100644 --- a/plutus-streaming/src/Plutus/Streaming.hs +++ b/plutus-streaming/src/Plutus/Streaming.hs @@ -49,6 +49,19 @@ withChainSyncEventStream socketPath networkId point consumer = do -- receives to the stream consumer through a MVar. The chain-sync client -- thread and the stream consumer will each block on each other and stay -- in lockstep. + -- + -- NOTE: choosing a MVar is a tradeoff towards simplicity. In this case a + -- (bounded) queue could perform better. Indeed a properly-sized buffer + -- can reduce the time the two threads are blocked waiting for each + -- other. The problem here is "properly-sized". A bounded queue like + -- Control.Concurrent.STM.TBQueue allows us to specify a max queue length + -- but block size can vary a lot (TODO quantify this) depending on the + -- era. We have an alternative implementation with customizable + -- (TBMQueue) but it needs to be extracted from the + -- plutus-chain-index-core package. Using a simple MVar doesn't seem to + -- slow down marconi's indexing, likely because the difference is + -- negligeable compared to existing network and IO latencies. + -- Therefore, let's stick with a MVar now and revisit later. nextBlockVar <- newEmptyMVar let client = chainSyncStreamingClient point nextBlockVar From 06ce181151c4c259debef71c058dd31f4fdc98c8 Mon Sep 17 00:00:00 2001 From: Andrea Bedini Date: Mon, 13 Jun 2022 11:03:33 +0800 Subject: [PATCH 4/4] Incorporate more feedback --- plutus-streaming/src/Plutus/Streaming.hs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/plutus-streaming/src/Plutus/Streaming.hs b/plutus-streaming/src/Plutus/Streaming.hs index 6ac5a890e5..cecbe08dfc 100644 --- a/plutus-streaming/src/Plutus/Streaming.hs +++ b/plutus-streaming/src/Plutus/Streaming.hs @@ -56,12 +56,12 @@ withChainSyncEventStream socketPath networkId point consumer = do -- other. The problem here is "properly-sized". A bounded queue like -- Control.Concurrent.STM.TBQueue allows us to specify a max queue length -- but block size can vary a lot (TODO quantify this) depending on the - -- era. We have an alternative implementation with customizable - -- (TBMQueue) but it needs to be extracted from the + -- era. We have an alternative implementation with customizable queue + -- size (TBMQueue) but it needs to be extracted from the -- plutus-chain-index-core package. Using a simple MVar doesn't seem to -- slow down marconi's indexing, likely because the difference is - -- negligeable compared to existing network and IO latencies. - -- Therefore, let's stick with a MVar now and revisit later. + -- negligeable compared to existing network and IO latencies. Therefore, + -- let's stick with a MVar now and revisit later. nextBlockVar <- newEmptyMVar let client = chainSyncStreamingClient point nextBlockVar @@ -102,7 +102,7 @@ chainSyncStreamingClient :: ChainPoint -> MVar (ChainSyncEvent e) -> ChainSyncClient e ChainPoint ChainTip IO () -chainSyncStreamingClient point nextBlockVar = +chainSyncStreamingClient point nextChainEventVar = ChainSyncClient $ pure $ SendMsgFindIntersect [point] onIntersect where onIntersect = @@ -121,10 +121,10 @@ chainSyncStreamingClient point nextBlockVar = ClientStNext { recvMsgRollForward = \bim ct -> ChainSyncClient $ do - putMVar nextBlockVar (RollForward bim ct) + putMVar nextChainEventVar (RollForward bim ct) sendRequestNext, recvMsgRollBackward = \cp ct -> ChainSyncClient $ do - putMVar nextBlockVar (RollBackward cp ct) + putMVar nextChainEventVar (RollBackward cp ct) sendRequestNext }