From eba253b12fc8d0b8d630738764d3bceca30633c5 Mon Sep 17 00:00:00 2001 From: Andrea Bedini Date: Tue, 7 Jun 2022 15:26:24 +0800 Subject: [PATCH] [PLT-87] marconi: Log progress of synced blocks (#489) * Make the indexer use plutus-streaming * Separate Marconi executable from plutus-chain-index Rename plutus-indexer to marconi * Add logging to marconi * Fix typo * Better time formatting * Deal with malformed block hashes * Catch NoIntersectionFound and warn the user * Simplify logging logic * Improve display of chain point * Improve display of chain point [take two] * Remove duplicate dependency * Improve logging output If the user started processing the chain at a point close to the end, the messages made it look like the synchronisation was already almost finished ("Synchronising (99.96%)"), risking confusing the user. Thats indeed the relative point on the chain that is currently being processed and has nothing to do with an ETA. This patch reworks the message to clear the meaning of that percentage. E.g. "Synchronising. Current slot 57848748 out of 57901784 (99.91%)" * Use iohk-monitoring-framework * Apply suggestions from code review Co-authored-by: koslambrou * Adjust logging format * Display last processed block, not chain tip * Rework imports and extensions Co-authored-by: koslambrou --- plutus-chain-index/app/Indexer.hs | 140 ---------------- plutus-chain-index/app/Marconi.hs | 150 ++++++++++++++++++ .../{src => app}/Marconi/Index/Datum.hs | 0 plutus-chain-index/app/Marconi/Logging.hs | 120 ++++++++++++++ plutus-chain-index/app/Marconi/Orphans.hs | 26 +++ plutus-chain-index/plutus-chain-index.cabal | 88 +++++----- plutus-streaming/src/Plutus/Streaming.hs | 7 +- 7 files changed, 348 insertions(+), 183 deletions(-) delete mode 100644 plutus-chain-index/app/Indexer.hs create mode 100644 plutus-chain-index/app/Marconi.hs rename plutus-chain-index/{src => app}/Marconi/Index/Datum.hs (100%) create mode 100644 plutus-chain-index/app/Marconi/Logging.hs create mode 100644 plutus-chain-index/app/Marconi/Orphans.hs diff --git a/plutus-chain-index/app/Indexer.hs b/plutus-chain-index/app/Indexer.hs deleted file mode 100644 index 894105f0f1..0000000000 --- a/plutus-chain-index/app/Indexer.hs +++ /dev/null @@ -1,140 +0,0 @@ -{-# LANGUAGE ExplicitForAll #-} -{-# LANGUAGE GADTs #-} -{-# LANGUAGE LambdaCase #-} -{-# LANGUAGE TupleSections #-} - -module Main where - -import Cardano.Api (Block (Block), BlockHeader (BlockHeader), BlockInMode (BlockInMode), BlockNo (BlockNo), CardanoMode, - ChainPoint (ChainPoint), Hash, NetworkId (Mainnet), SlotNo (SlotNo), chainPointToSlotNo, - deserialiseFromRawBytesHex, proxyToAsType) -import Cardano.Api qualified as C -import Cardano.BM.Trace (nullTracer) -import Cardano.Protocol.Socket.Client (ChainSyncEvent (Resume, RollBackward, RollForward), runChainSync) -import Control.Concurrent (threadDelay) -import Control.Lens.Operators ((^.)) -import Control.Monad (forever, when) -import Data.IORef (IORef, newIORef, readIORef, writeIORef) -import Data.List (findIndex) -import Data.Map (assocs) -import Data.Maybe (fromJust, fromMaybe) -import Data.Proxy (Proxy (Proxy)) -import Data.Text (pack) -import Data.Text.Encoding (encodeUtf8) -import Index.VSplit qualified as Ix -import Ledger.TimeSlot (SlotConfig (..)) -import Ledger.Tx.CardanoAPI (withIsCardanoEra) -import Marconi.Index.Datum (DatumIndex) -import Marconi.Index.Datum qualified as Ix -import Options.Applicative (Parser, execParser, fullDesc, header, help, helper, info, long, metavar, progDesc, - strOption, (<**>)) -import Plutus.ChainIndex.Tx (ChainIndexTx (..)) -import Plutus.Contract.CardanoAPI (fromCardanoTx) -import Plutus.Script.Utils.V1.Scripts (Datum, DatumHash) - -{- | This executable is meant to exercise a set of indexers (for now datumhash -> datum) - against the mainnet (meant to be used for testing). - - In case you want to access the results of the datumhash indexer you need to query - the resulting database: - $ sqlite3 datums.sqlite - > select slotNo, datumHash, datum from kv_datumhsh_datum where slotNo = 39920450; - 39920450|679a55b523ff8d61942b2583b76e5d49498468164802ef1ebe513c685d6fb5c2|X(002f9787436835852ea78d3c45fc3d436b324184 --} - --- Options -data Options = Options - { socketPath :: FilePath - , dbPath :: FilePath - } - -options :: Parser Options -options = Options - <$> strOption - ( long "socket" - <> metavar "SOCKET" - <> help "Path to node socket." ) - <*> strOption - ( long "database" - <> metavar "DATABASE" - <> help "Path to database." ) - --- We only care about the mainnet -slotConfig :: SlotConfig -slotConfig = - SlotConfig - { scSlotZeroTime = 1596059091000 - , scSlotLength = 1000 - } - -networkId :: NetworkId -networkId = Mainnet - --- We don't generally need to sync blocks earlier than the Goguen era (other than --- testing for memory leaks) so we may want to start synchronising from a slot that --- is closer to Goguen era. -closeToGoguen :: ChainPoint -closeToGoguen = - ChainPoint - (SlotNo 39795032) - (fromJust $ parseHash "3e6f6450f85962d651654ee66091980b2332166f5505fd10b97b0520c9efac90") - -parseHash :: String -> Maybe (Hash BlockHeader) -parseHash hash = - deserialiseFromRawBytesHex (proxyToAsType Proxy) (encodeUtf8 $ pack hash) - -getDatums :: BlockInMode CardanoMode -> [(SlotNo, (DatumHash, Datum))] -getDatums (BlockInMode (Block (BlockHeader slotNo _ _) txs) era) = withIsCardanoEra era $ concatMap (go era) txs - where - go :: C.IsCardanoEra era - => C.EraInMode era C.CardanoMode - -> C.Tx era - -> [(SlotNo, (DatumHash, Datum))] - go era' tx = - let hashes = either (const []) (assocs . _citxData) $ fromCardanoTx era' tx - in map (slotNo,) hashes - -processBlock :: IORef DatumIndex -> ChainSyncEvent -> IO () -processBlock ixref = \case - -- Not really supported - Resume point -> putStrLn ("resume " <> show point) >> pure () - RollForward blk@(BlockInMode (Block (BlockHeader slotNo _ blockNo@(BlockNo b)) _txs) _era) _tip -> do - when (b `rem` 1000 == 0) $ - putStrLn $ show slotNo <> " / " <> show blockNo - ix <- readIORef ixref - ix' <- Ix.insert (getDatums blk) ix - writeIORef ixref ix' - RollBackward point tip -> do - putStrLn ("rollback to " <> show tip) - rollbackToPoint point ixref - -rollbackToPoint - :: ChainPoint -> IORef DatumIndex -> IO () -rollbackToPoint point ixref = do - ix <- readIORef ixref - events <- Ix.getEvents (ix ^. Ix.storage) - let ix' = fromMaybe ix $ rollbackOffset events ix - writeIORef ixref ix' - where - rollbackOffset :: [Ix.Event] -> DatumIndex -> Maybe DatumIndex - rollbackOffset events ix = do - slot <- chainPointToSlotNo point - offset <- findIndex (any (\(s, _) -> s < slot)) events - Ix.rewind offset ix - -main :: IO () -main = do - options' <- execParser opts - tix <- Ix.open (dbPath options') (Ix.Depth 2160) >>= newIORef - _ <- runChainSync (socketPath options') - nullTracer - slotConfig - networkId - [closeToGoguen] - (processBlock tix) - forever $ threadDelay 1000000000 - where - opts = info (options <**> helper) - ( fullDesc - <> progDesc "Synchronise datums with mainnet" - <> header "indexer - an indexing proof of concept" ) diff --git a/plutus-chain-index/app/Marconi.hs b/plutus-chain-index/app/Marconi.hs new file mode 100644 index 0000000000..cefa011447 --- /dev/null +++ b/plutus-chain-index/app/Marconi.hs @@ -0,0 +1,150 @@ +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE TupleSections #-} + +module Main where + +import Cardano.Api (Block (Block), BlockHeader (BlockHeader), BlockInMode (BlockInMode), CardanoMode, + ChainPoint (ChainPoint), EraInMode, Hash, IsCardanoEra, NetworkId (Mainnet, Testnet), + NetworkMagic (NetworkMagic), SlotNo (SlotNo), Tx, chainPointToSlotNo, deserialiseFromRawBytesHex, + proxyToAsType) +import Cardano.BM.Setup (withTrace) +import Cardano.BM.Trace (logError) +import Cardano.BM.Tracing (defaultConfigStdout) +import Control.Exception (catch) +import Control.Lens.Operators ((^.)) +import Data.ByteString.Char8 qualified as C8 +import Data.List (findIndex) +import Data.Map (assocs) +import Data.Maybe (fromJust, fromMaybe) +import Data.Proxy (Proxy (Proxy)) +import Index.VSplit qualified as Ix +import Ledger.Tx.CardanoAPI (withIsCardanoEra) +import Marconi.Index.Datum (DatumIndex) +import Marconi.Index.Datum qualified as Ix +import Marconi.Logging (logging) +import Options.Applicative (Parser, auto, execParser, flag', help, helper, info, long, maybeReader, metavar, option, + readerError, strOption, (<**>), (<|>)) +import Plutus.ChainIndex.Tx (ChainIndexTx (..)) +import Plutus.Contract.CardanoAPI (fromCardanoTx) +import Plutus.Script.Utils.V1.Scripts (Datum, DatumHash) +import Plutus.Streaming (ChainSyncEvent (RollBackward, RollForward), ChainSyncEventException (NoIntersectionFound), + withChainSyncEventStream) +import Prettyprinter (defaultLayoutOptions, layoutPretty, pretty, (<+>)) +import Prettyprinter.Render.Text (renderStrict) +import Streaming.Prelude qualified as S + +-- | This executable is meant to exercise a set of indexers (for now datumhash -> datum) +-- against the mainnet (meant to be used for testing). +-- +-- In case you want to access the results of the datumhash indexer you need to query +-- the resulting database: +-- $ sqlite3 datums.sqlite +-- > select slotNo, datumHash, datum from kv_datumhsh_datum where slotNo = 39920450; +-- 39920450|679a55b523ff8d61942b2583b76e5d49498468164802ef1ebe513c685d6fb5c2|X(002f9787436835852ea78d3c45fc3d436b324184 +data Options = Options + { optionsSocketPath :: String, + optionsNetworkId :: NetworkId, + optionsChainPoint :: ChainPoint, + optionsDatabasePath :: FilePath + } + deriving (Show) + +parseOptions :: IO Options +parseOptions = execParser $ info (optionsParser <**> helper) mempty + +optionsParser :: Parser Options +optionsParser = + Options + <$> strOption (long "socket-path" <> help "Path to node socket.") + <*> networkIdParser + <*> chainPointParser + <*> strOption (long "database-path" <> help "Path to database.") + +networkIdParser :: Parser NetworkId +networkIdParser = + pMainnet <|> pTestnet + where + pMainnet = + flag' + Mainnet + ( long "mainnet" + <> help "Use the mainnet magic id." + ) + + pTestnet = + Testnet . NetworkMagic + <$> option + auto + ( long "testnet-magic" + <> metavar "NATURAL" + <> help "Specify a testnet magic id." + ) + +chainPointParser :: Parser ChainPoint +chainPointParser = + pure chainPointCloseToGoguen + <|> ( ChainPoint + <$> option (SlotNo <$> auto) (long "slot-no" <> metavar "SLOT-NO") + <*> option + (maybeReader maybeParseHashBlockHeader <|> readerError "Malformed block hash") + (long "block-hash" <> metavar "BLOCK-HASH") + ) + where + -- We don't generally need to sync blocks earlier than the Goguen era (other than + -- testing for memory leaks) so we may want to start synchronising from a slot that + -- is closer to Goguen era. + chainPointCloseToGoguen = + ChainPoint + (SlotNo 39795032) + (fromJust $ maybeParseHashBlockHeader "3e6f6450f85962d651654ee66091980b2332166f5505fd10b97b0520c9efac90") + +getDatums :: BlockInMode CardanoMode -> [(SlotNo, (DatumHash, Datum))] +getDatums (BlockInMode (Block (BlockHeader slotNo _ _) txs) era) = withIsCardanoEra era $ concatMap (go era) txs + where + go :: + IsCardanoEra era => + EraInMode era CardanoMode -> + Tx era -> + [(SlotNo, (DatumHash, Datum))] + go era' tx = + let hashes = either (const []) (assocs . _citxData) $ fromCardanoTx era' tx + in map (slotNo,) hashes + +main :: IO () +main = do + Options {optionsSocketPath, optionsNetworkId, optionsChainPoint, optionsDatabasePath} <- parseOptions + + let initial :: IO DatumIndex + initial = Ix.open optionsDatabasePath (Ix.Depth 2160) + + step :: DatumIndex -> ChainSyncEvent (BlockInMode CardanoMode) -> IO DatumIndex + step index (RollForward blk _ct) = + Ix.insert (getDatums blk) index + step index (RollBackward cp _ct) = do + events <- Ix.getEvents (index ^. Ix.storage) + return $ + fromMaybe index $ do + slot <- chainPointToSlotNo cp + offset <- findIndex (any (\(s, _) -> s < slot)) events + Ix.rewind offset index + + finish :: DatumIndex -> IO () + finish _index = pure () -- Nothing to do here, perhaps we should use this to close the database? + c <- defaultConfigStdout + + withTrace c "marconi" $ \trace -> + withChainSyncEventStream + optionsSocketPath + optionsNetworkId + optionsChainPoint + (S.foldM_ step initial finish . logging trace) + `catch` \NoIntersectionFound -> + logError trace $ + renderStrict $ + layoutPretty defaultLayoutOptions $ + "No intersection found when looking for the chain point" <+> pretty optionsChainPoint <> "." + <+> "Please check the slot number and the block hash do belong to the chain" + +maybeParseHashBlockHeader :: String -> Maybe (Hash BlockHeader) +maybeParseHashBlockHeader = deserialiseFromRawBytesHex (proxyToAsType Proxy) . C8.pack diff --git a/plutus-chain-index/src/Marconi/Index/Datum.hs b/plutus-chain-index/app/Marconi/Index/Datum.hs similarity index 100% rename from plutus-chain-index/src/Marconi/Index/Datum.hs rename to plutus-chain-index/app/Marconi/Index/Datum.hs diff --git a/plutus-chain-index/app/Marconi/Logging.hs b/plutus-chain-index/app/Marconi/Logging.hs new file mode 100644 index 0000000000..ad262e0d06 --- /dev/null +++ b/plutus-chain-index/app/Marconi/Logging.hs @@ -0,0 +1,120 @@ +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedStrings #-} +{-# OPTIONS_GHC -Wno-orphans #-} + +module Marconi.Logging (logging) where + +import Cardano.Api (Block (Block), BlockHeader (BlockHeader), BlockInMode (BlockInMode), CardanoMode, + ChainPoint (ChainPoint), ChainTip (ChainTip), SlotNo (SlotNo)) +import Cardano.BM.Trace (Trace, logInfo) +import Control.Monad (when) +import Data.IORef (IORef, modifyIORef', newIORef, readIORef) +import Data.Text (Text) +import Data.Time (NominalDiffTime, UTCTime, defaultTimeLocale, diffUTCTime, formatTime, getCurrentTime) +import Marconi.Orphans () +import Plutus.Streaming (ChainSyncEvent (RollBackward, RollForward)) +import Prettyprinter (Pretty (pretty), defaultLayoutOptions, layoutPretty, (<+>)) +import Prettyprinter.Render.Text (renderStrict) +import Streaming (Of, Stream, effect) +import Streaming.Prelude qualified as S +import Text.Printf (printf) + +data SyncStats = SyncStats + { -- | Number of applied blocks since last message + syncStatsNumBlocks :: !Int, + -- | Number of rollbacks + syncStatsNumRollbacks :: !Int, + -- | Timestamp of last printed message + syncStatsLastMessage :: !(Maybe UTCTime) + } + +logging :: + Trace IO Text -> + Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r -> + Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r +logging tracer s = effect $ do + stats <- newIORef (SyncStats 0 0 Nothing) + return $ S.chain (update stats) s + where + minSecondsBetweenMsg :: NominalDiffTime + minSecondsBetweenMsg = 10 + + update :: IORef SyncStats -> ChainSyncEvent (BlockInMode CardanoMode) -> IO () + update statsRef (RollForward bim ct) = do + let cp = case bim of (BlockInMode (Block (BlockHeader slotNo hash _blockNo) _txs) _eim) -> ChainPoint slotNo hash + modifyIORef' statsRef $ \stats -> + stats {syncStatsNumBlocks = syncStatsNumBlocks stats + 1} + printMessage statsRef cp ct + update statsRef (RollBackward cp ct) = do + modifyIORef' statsRef $ \stats -> + stats {syncStatsNumRollbacks = syncStatsNumRollbacks stats + 1} + printMessage statsRef cp ct + + printMessage statsRef cp ct = do + SyncStats {syncStatsNumBlocks, syncStatsNumRollbacks, syncStatsLastMessage} <- readIORef statsRef + + now <- getCurrentTime + + let timeSinceLastMsg = diffUTCTime now <$> syncStatsLastMessage + + let blocksMsg = case timeSinceLastMsg of + Nothing -> id + Just t -> \k -> + "Processed" + <+> pretty syncStatsNumBlocks + <+> "blocks in the last" + <+> pretty (formatTime defaultTimeLocale "%s" t) + <+> "seconds" + <+> let rate = fromIntegral syncStatsNumBlocks / realToFrac t :: Double + in pretty (printf "(%.0f blocks/sec)." rate :: String) + <+> k + + let rollbackMsg = case timeSinceLastMsg of + Nothing -> id + Just t -> \k -> + ( case syncStatsNumRollbacks of + 0 -> "No" + _ -> pretty syncStatsNumRollbacks + ) + <+> "rollbacks in the last" + <+> pretty (formatTime defaultTimeLocale "%s" t) + <+> "seconds." + <+> k + + let syncMsg = case (cp, ct) of + (ChainPoint (SlotNo chainPointSlot) _, ChainTip (SlotNo chainTipSlot) _header _blockNo) + -- TODO: MAGIC number here. Is there a better number? + -- 100 represents the number of slots before the + -- node where we consider the chain-index to be synced. + | chainTipSlot - chainPointSlot < 100 -> + "Synchronised." + (ChainPoint (SlotNo chainPointSlotNo) _, ChainTip (SlotNo chainTipSlotNo) _header _blockNo) -> + let pct = (100 :: Double) * fromIntegral chainPointSlotNo / fromIntegral chainTipSlotNo + in pretty + ( printf + "Synchronising. Current slot %d out of %d (%0.2f%%)." + chainPointSlotNo + chainTipSlotNo + pct :: + String + ) + _ -> "Starting." + + let shouldPrint = case timeSinceLastMsg of + Nothing -> True + Just t + | t > minSecondsBetweenMsg -> True + | otherwise -> False + + when shouldPrint $ do + logInfo tracer $ + renderStrict $ + layoutPretty defaultLayoutOptions $ + syncMsg <+> (blocksMsg $ rollbackMsg $ "Last block processed" <+> pretty cp <> ".") + modifyIORef' statsRef $ \stats -> + stats + { syncStatsNumBlocks = 0, + syncStatsNumRollbacks = 0, + syncStatsLastMessage = Just now + } diff --git a/plutus-chain-index/app/Marconi/Orphans.hs b/plutus-chain-index/app/Marconi/Orphans.hs new file mode 100644 index 0000000000..996064f606 --- /dev/null +++ b/plutus-chain-index/app/Marconi/Orphans.hs @@ -0,0 +1,26 @@ +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE OverloadedStrings #-} +{-# OPTIONS_GHC -Wno-orphans #-} + +module Marconi.Orphans where + +import Cardano.Api (BlockHeader, BlockNo (BlockNo), ChainPoint (..), ChainTip (ChainTip, ChainTipAtGenesis), Hash, + SlotNo (SlotNo), serialiseToRawBytesHexText) +import Prettyprinter (Pretty (pretty), (<+>)) + +instance Pretty ChainTip where + pretty ChainTipAtGenesis = "ChainTipAtGenesis" + pretty (ChainTip sn ha bn) = "ChainTip(" <> pretty sn <> "," <+> pretty ha <> "," <+> pretty bn <> ")" + +instance Pretty ChainPoint where + pretty ChainPointAtGenesis = "ChainPointAtGenesis" + pretty (ChainPoint sn ha) = "ChainPoint(" <> pretty sn <> "," <+> pretty ha <> ")" + +instance Pretty (Hash BlockHeader) where + pretty hash = "BlockHash" <+> pretty (serialiseToRawBytesHexText hash) + +instance Pretty SlotNo where + pretty (SlotNo n) = "Slot" <+> pretty n + +instance Pretty BlockNo where + pretty (BlockNo bn) = "BlockNo" <+> pretty bn diff --git a/plutus-chain-index/plutus-chain-index.cabal b/plutus-chain-index/plutus-chain-index.cabal index 8ff2322405..5e1b7b98c2 100644 --- a/plutus-chain-index/plutus-chain-index.cabal +++ b/plutus-chain-index/plutus-chain-index.cabal @@ -37,40 +37,35 @@ library Plutus.ChainIndex.Lib Plutus.ChainIndex.Logging Plutus.ChainIndex.SyncStats - Marconi.Index.Datum other-modules: Control.Concurrent.STM.TBMQueue hs-source-dirs: src build-depends: - plutus-ledger -any, - plutus-ledger-api -any, - plutus-chain-index-core -any, - plutus-script-utils -any, - freer-extras -any + plutus-ledger, + plutus-ledger-api, + plutus-chain-index-core, + freer-extras build-depends: - aeson -any, - async -any, + aeson, + async, base >=4.7 && <5, - beam-sqlite -any, - beam-migrate -any, - cardano-api -any, - contra-tracer -any, - clock -any, - data-default -any, - freer-simple -any, - iohk-monitoring -any, - lens -any, - optparse-applicative -any, - ouroboros-network -any, + beam-sqlite, + beam-migrate, + cardano-api, + contra-tracer, + clock, + data-default, + freer-simple, + iohk-monitoring, + lens, + optparse-applicative, + ouroboros-network, prettyprinter >=1.1.0.1, - resource-pool -any, - sqlite-simple -any, - stm -any, - time-units -any, - yaml -any, - serialise -any, - hysterical-screams -any, - bytestring -any, + resource-pool, + sqlite-simple, + stm, + time-units, + yaml, executable plutus-chain-index main-is: Main.hs @@ -83,11 +78,15 @@ executable plutus-chain-index -Wno-missing-import-lists -Wredundant-constraints -O0 build-depends: base >=4.9 && <5, - plutus-chain-index -any + plutus-chain-index -executable plutus-indexer - main-is: Indexer.hs +executable marconi + main-is: Marconi.hs hs-source-dirs: app + other-modules: + Marconi.Index.Datum + Marconi.Logging + Marconi.Orphans default-language: Haskell2010 default-extensions: ImportQualifiedPost ghc-options: @@ -96,14 +95,21 @@ executable plutus-indexer -Wno-missing-import-lists -Wredundant-constraints -O0 build-depends: base >=4.9 && <5, - plutus-chain-index -any, - iohk-monitoring -any, - cardano-api -any, - plutus-ledger -any, - plutus-chain-index-core -any, - plutus-script-utils -any, - containers -any, - lens -any, - hysterical-screams -any, - text -any, - optparse-applicative -any, + bytestring, + cardano-api, + containers, + hysterical-screams, + iohk-monitoring, + lens, + optparse-applicative, + plutus-chain-index, + plutus-chain-index-core, + plutus-ledger, + plutus-script-utils, + plutus-streaming, + prettyprinter, + serialise, + sqlite-simple, + streaming, + text, + time, diff --git a/plutus-streaming/src/Plutus/Streaming.hs b/plutus-streaming/src/Plutus/Streaming.hs index 6acaa6eaa7..fb07a454d7 100644 --- a/plutus-streaming/src/Plutus/Streaming.hs +++ b/plutus-streaming/src/Plutus/Streaming.hs @@ -14,9 +14,9 @@ import Cardano.Api (BlockInMode, CardanoMode, ChainPoint, ChainSyncClient (Chain import Cardano.Api.ChainSync.Client (ClientStIdle (SendMsgFindIntersect, SendMsgRequestNext), ClientStIntersect (ClientStIntersect, recvMsgIntersectFound, recvMsgIntersectNotFound), ClientStNext (ClientStNext, recvMsgRollBackward, recvMsgRollForward)) -import Control.Concurrent.Async (link, withAsync) +import Control.Concurrent.Async (ExceptionInLinkedThread (ExceptionInLinkedThread), link, withAsync) import Control.Concurrent.STM (TChan, atomically, dupTChan, newBroadcastTChanIO, readTChan, writeTChan) -import Control.Exception (Exception, throw) +import Control.Exception (Exception, SomeException (SomeException), catch, throw) import GHC.Generics (Generic) import Streaming (Of, Stream) import Streaming.Prelude qualified as S @@ -86,6 +86,9 @@ withChainSyncEventStream socketPath networkId point consumer = do link a -- Run the consumer consumer $ S.repeatM $ atomically (readTChan readerChannel) + -- Let's rethrow exceptions from the client thread unwrapped, so that the + -- consumer does not have to know anything about async + `catch` \(ExceptionInLinkedThread _ (SomeException e)) -> throw e -- | `chainSyncStreamingClient` is the client that connects to a local node -- and runs the chain-sync mini-protocol. This client is fire-and-forget