diff --git a/plutus-chain-index/app/Indexer.hs b/plutus-chain-index/app/Indexer.hs deleted file mode 100644 index 894105f0f1..0000000000 --- a/plutus-chain-index/app/Indexer.hs +++ /dev/null @@ -1,140 +0,0 @@ -{-# LANGUAGE ExplicitForAll #-} -{-# LANGUAGE GADTs #-} -{-# LANGUAGE LambdaCase #-} -{-# LANGUAGE TupleSections #-} - -module Main where - -import Cardano.Api (Block (Block), BlockHeader (BlockHeader), BlockInMode (BlockInMode), BlockNo (BlockNo), CardanoMode, - ChainPoint (ChainPoint), Hash, NetworkId (Mainnet), SlotNo (SlotNo), chainPointToSlotNo, - deserialiseFromRawBytesHex, proxyToAsType) -import Cardano.Api qualified as C -import Cardano.BM.Trace (nullTracer) -import Cardano.Protocol.Socket.Client (ChainSyncEvent (Resume, RollBackward, RollForward), runChainSync) -import Control.Concurrent (threadDelay) -import Control.Lens.Operators ((^.)) -import Control.Monad (forever, when) -import Data.IORef (IORef, newIORef, readIORef, writeIORef) -import Data.List (findIndex) -import Data.Map (assocs) -import Data.Maybe (fromJust, fromMaybe) -import Data.Proxy (Proxy (Proxy)) -import Data.Text (pack) -import Data.Text.Encoding (encodeUtf8) -import Index.VSplit qualified as Ix -import Ledger.TimeSlot (SlotConfig (..)) -import Ledger.Tx.CardanoAPI (withIsCardanoEra) -import Marconi.Index.Datum (DatumIndex) -import Marconi.Index.Datum qualified as Ix -import Options.Applicative (Parser, execParser, fullDesc, header, help, helper, info, long, metavar, progDesc, - strOption, (<**>)) -import Plutus.ChainIndex.Tx (ChainIndexTx (..)) -import Plutus.Contract.CardanoAPI (fromCardanoTx) -import Plutus.Script.Utils.V1.Scripts (Datum, DatumHash) - -{- | This executable is meant to exercise a set of indexers (for now datumhash -> datum) - against the mainnet (meant to be used for testing). - - In case you want to access the results of the datumhash indexer you need to query - the resulting database: - $ sqlite3 datums.sqlite - > select slotNo, datumHash, datum from kv_datumhsh_datum where slotNo = 39920450; - 39920450|679a55b523ff8d61942b2583b76e5d49498468164802ef1ebe513c685d6fb5c2|X(002f9787436835852ea78d3c45fc3d436b324184 --} - --- Options -data Options = Options - { socketPath :: FilePath - , dbPath :: FilePath - } - -options :: Parser Options -options = Options - <$> strOption - ( long "socket" - <> metavar "SOCKET" - <> help "Path to node socket." ) - <*> strOption - ( long "database" - <> metavar "DATABASE" - <> help "Path to database." ) - --- We only care about the mainnet -slotConfig :: SlotConfig -slotConfig = - SlotConfig - { scSlotZeroTime = 1596059091000 - , scSlotLength = 1000 - } - -networkId :: NetworkId -networkId = Mainnet - --- We don't generally need to sync blocks earlier than the Goguen era (other than --- testing for memory leaks) so we may want to start synchronising from a slot that --- is closer to Goguen era. -closeToGoguen :: ChainPoint -closeToGoguen = - ChainPoint - (SlotNo 39795032) - (fromJust $ parseHash "3e6f6450f85962d651654ee66091980b2332166f5505fd10b97b0520c9efac90") - -parseHash :: String -> Maybe (Hash BlockHeader) -parseHash hash = - deserialiseFromRawBytesHex (proxyToAsType Proxy) (encodeUtf8 $ pack hash) - -getDatums :: BlockInMode CardanoMode -> [(SlotNo, (DatumHash, Datum))] -getDatums (BlockInMode (Block (BlockHeader slotNo _ _) txs) era) = withIsCardanoEra era $ concatMap (go era) txs - where - go :: C.IsCardanoEra era - => C.EraInMode era C.CardanoMode - -> C.Tx era - -> [(SlotNo, (DatumHash, Datum))] - go era' tx = - let hashes = either (const []) (assocs . _citxData) $ fromCardanoTx era' tx - in map (slotNo,) hashes - -processBlock :: IORef DatumIndex -> ChainSyncEvent -> IO () -processBlock ixref = \case - -- Not really supported - Resume point -> putStrLn ("resume " <> show point) >> pure () - RollForward blk@(BlockInMode (Block (BlockHeader slotNo _ blockNo@(BlockNo b)) _txs) _era) _tip -> do - when (b `rem` 1000 == 0) $ - putStrLn $ show slotNo <> " / " <> show blockNo - ix <- readIORef ixref - ix' <- Ix.insert (getDatums blk) ix - writeIORef ixref ix' - RollBackward point tip -> do - putStrLn ("rollback to " <> show tip) - rollbackToPoint point ixref - -rollbackToPoint - :: ChainPoint -> IORef DatumIndex -> IO () -rollbackToPoint point ixref = do - ix <- readIORef ixref - events <- Ix.getEvents (ix ^. Ix.storage) - let ix' = fromMaybe ix $ rollbackOffset events ix - writeIORef ixref ix' - where - rollbackOffset :: [Ix.Event] -> DatumIndex -> Maybe DatumIndex - rollbackOffset events ix = do - slot <- chainPointToSlotNo point - offset <- findIndex (any (\(s, _) -> s < slot)) events - Ix.rewind offset ix - -main :: IO () -main = do - options' <- execParser opts - tix <- Ix.open (dbPath options') (Ix.Depth 2160) >>= newIORef - _ <- runChainSync (socketPath options') - nullTracer - slotConfig - networkId - [closeToGoguen] - (processBlock tix) - forever $ threadDelay 1000000000 - where - opts = info (options <**> helper) - ( fullDesc - <> progDesc "Synchronise datums with mainnet" - <> header "indexer - an indexing proof of concept" ) diff --git a/plutus-chain-index/app/Marconi.hs b/plutus-chain-index/app/Marconi.hs new file mode 100644 index 0000000000..cefa011447 --- /dev/null +++ b/plutus-chain-index/app/Marconi.hs @@ -0,0 +1,150 @@ +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE TupleSections #-} + +module Main where + +import Cardano.Api (Block (Block), BlockHeader (BlockHeader), BlockInMode (BlockInMode), CardanoMode, + ChainPoint (ChainPoint), EraInMode, Hash, IsCardanoEra, NetworkId (Mainnet, Testnet), + NetworkMagic (NetworkMagic), SlotNo (SlotNo), Tx, chainPointToSlotNo, deserialiseFromRawBytesHex, + proxyToAsType) +import Cardano.BM.Setup (withTrace) +import Cardano.BM.Trace (logError) +import Cardano.BM.Tracing (defaultConfigStdout) +import Control.Exception (catch) +import Control.Lens.Operators ((^.)) +import Data.ByteString.Char8 qualified as C8 +import Data.List (findIndex) +import Data.Map (assocs) +import Data.Maybe (fromJust, fromMaybe) +import Data.Proxy (Proxy (Proxy)) +import Index.VSplit qualified as Ix +import Ledger.Tx.CardanoAPI (withIsCardanoEra) +import Marconi.Index.Datum (DatumIndex) +import Marconi.Index.Datum qualified as Ix +import Marconi.Logging (logging) +import Options.Applicative (Parser, auto, execParser, flag', help, helper, info, long, maybeReader, metavar, option, + readerError, strOption, (<**>), (<|>)) +import Plutus.ChainIndex.Tx (ChainIndexTx (..)) +import Plutus.Contract.CardanoAPI (fromCardanoTx) +import Plutus.Script.Utils.V1.Scripts (Datum, DatumHash) +import Plutus.Streaming (ChainSyncEvent (RollBackward, RollForward), ChainSyncEventException (NoIntersectionFound), + withChainSyncEventStream) +import Prettyprinter (defaultLayoutOptions, layoutPretty, pretty, (<+>)) +import Prettyprinter.Render.Text (renderStrict) +import Streaming.Prelude qualified as S + +-- | This executable is meant to exercise a set of indexers (for now datumhash -> datum) +-- against the mainnet (meant to be used for testing). +-- +-- In case you want to access the results of the datumhash indexer you need to query +-- the resulting database: +-- $ sqlite3 datums.sqlite +-- > select slotNo, datumHash, datum from kv_datumhsh_datum where slotNo = 39920450; +-- 39920450|679a55b523ff8d61942b2583b76e5d49498468164802ef1ebe513c685d6fb5c2|X(002f9787436835852ea78d3c45fc3d436b324184 +data Options = Options + { optionsSocketPath :: String, + optionsNetworkId :: NetworkId, + optionsChainPoint :: ChainPoint, + optionsDatabasePath :: FilePath + } + deriving (Show) + +parseOptions :: IO Options +parseOptions = execParser $ info (optionsParser <**> helper) mempty + +optionsParser :: Parser Options +optionsParser = + Options + <$> strOption (long "socket-path" <> help "Path to node socket.") + <*> networkIdParser + <*> chainPointParser + <*> strOption (long "database-path" <> help "Path to database.") + +networkIdParser :: Parser NetworkId +networkIdParser = + pMainnet <|> pTestnet + where + pMainnet = + flag' + Mainnet + ( long "mainnet" + <> help "Use the mainnet magic id." + ) + + pTestnet = + Testnet . NetworkMagic + <$> option + auto + ( long "testnet-magic" + <> metavar "NATURAL" + <> help "Specify a testnet magic id." + ) + +chainPointParser :: Parser ChainPoint +chainPointParser = + pure chainPointCloseToGoguen + <|> ( ChainPoint + <$> option (SlotNo <$> auto) (long "slot-no" <> metavar "SLOT-NO") + <*> option + (maybeReader maybeParseHashBlockHeader <|> readerError "Malformed block hash") + (long "block-hash" <> metavar "BLOCK-HASH") + ) + where + -- We don't generally need to sync blocks earlier than the Goguen era (other than + -- testing for memory leaks) so we may want to start synchronising from a slot that + -- is closer to Goguen era. + chainPointCloseToGoguen = + ChainPoint + (SlotNo 39795032) + (fromJust $ maybeParseHashBlockHeader "3e6f6450f85962d651654ee66091980b2332166f5505fd10b97b0520c9efac90") + +getDatums :: BlockInMode CardanoMode -> [(SlotNo, (DatumHash, Datum))] +getDatums (BlockInMode (Block (BlockHeader slotNo _ _) txs) era) = withIsCardanoEra era $ concatMap (go era) txs + where + go :: + IsCardanoEra era => + EraInMode era CardanoMode -> + Tx era -> + [(SlotNo, (DatumHash, Datum))] + go era' tx = + let hashes = either (const []) (assocs . _citxData) $ fromCardanoTx era' tx + in map (slotNo,) hashes + +main :: IO () +main = do + Options {optionsSocketPath, optionsNetworkId, optionsChainPoint, optionsDatabasePath} <- parseOptions + + let initial :: IO DatumIndex + initial = Ix.open optionsDatabasePath (Ix.Depth 2160) + + step :: DatumIndex -> ChainSyncEvent (BlockInMode CardanoMode) -> IO DatumIndex + step index (RollForward blk _ct) = + Ix.insert (getDatums blk) index + step index (RollBackward cp _ct) = do + events <- Ix.getEvents (index ^. Ix.storage) + return $ + fromMaybe index $ do + slot <- chainPointToSlotNo cp + offset <- findIndex (any (\(s, _) -> s < slot)) events + Ix.rewind offset index + + finish :: DatumIndex -> IO () + finish _index = pure () -- Nothing to do here, perhaps we should use this to close the database? + c <- defaultConfigStdout + + withTrace c "marconi" $ \trace -> + withChainSyncEventStream + optionsSocketPath + optionsNetworkId + optionsChainPoint + (S.foldM_ step initial finish . logging trace) + `catch` \NoIntersectionFound -> + logError trace $ + renderStrict $ + layoutPretty defaultLayoutOptions $ + "No intersection found when looking for the chain point" <+> pretty optionsChainPoint <> "." + <+> "Please check the slot number and the block hash do belong to the chain" + +maybeParseHashBlockHeader :: String -> Maybe (Hash BlockHeader) +maybeParseHashBlockHeader = deserialiseFromRawBytesHex (proxyToAsType Proxy) . C8.pack diff --git a/plutus-chain-index/src/Marconi/Index/Datum.hs b/plutus-chain-index/app/Marconi/Index/Datum.hs similarity index 100% rename from plutus-chain-index/src/Marconi/Index/Datum.hs rename to plutus-chain-index/app/Marconi/Index/Datum.hs diff --git a/plutus-chain-index/app/Marconi/Logging.hs b/plutus-chain-index/app/Marconi/Logging.hs new file mode 100644 index 0000000000..ad262e0d06 --- /dev/null +++ b/plutus-chain-index/app/Marconi/Logging.hs @@ -0,0 +1,120 @@ +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedStrings #-} +{-# OPTIONS_GHC -Wno-orphans #-} + +module Marconi.Logging (logging) where + +import Cardano.Api (Block (Block), BlockHeader (BlockHeader), BlockInMode (BlockInMode), CardanoMode, + ChainPoint (ChainPoint), ChainTip (ChainTip), SlotNo (SlotNo)) +import Cardano.BM.Trace (Trace, logInfo) +import Control.Monad (when) +import Data.IORef (IORef, modifyIORef', newIORef, readIORef) +import Data.Text (Text) +import Data.Time (NominalDiffTime, UTCTime, defaultTimeLocale, diffUTCTime, formatTime, getCurrentTime) +import Marconi.Orphans () +import Plutus.Streaming (ChainSyncEvent (RollBackward, RollForward)) +import Prettyprinter (Pretty (pretty), defaultLayoutOptions, layoutPretty, (<+>)) +import Prettyprinter.Render.Text (renderStrict) +import Streaming (Of, Stream, effect) +import Streaming.Prelude qualified as S +import Text.Printf (printf) + +data SyncStats = SyncStats + { -- | Number of applied blocks since last message + syncStatsNumBlocks :: !Int, + -- | Number of rollbacks + syncStatsNumRollbacks :: !Int, + -- | Timestamp of last printed message + syncStatsLastMessage :: !(Maybe UTCTime) + } + +logging :: + Trace IO Text -> + Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r -> + Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r +logging tracer s = effect $ do + stats <- newIORef (SyncStats 0 0 Nothing) + return $ S.chain (update stats) s + where + minSecondsBetweenMsg :: NominalDiffTime + minSecondsBetweenMsg = 10 + + update :: IORef SyncStats -> ChainSyncEvent (BlockInMode CardanoMode) -> IO () + update statsRef (RollForward bim ct) = do + let cp = case bim of (BlockInMode (Block (BlockHeader slotNo hash _blockNo) _txs) _eim) -> ChainPoint slotNo hash + modifyIORef' statsRef $ \stats -> + stats {syncStatsNumBlocks = syncStatsNumBlocks stats + 1} + printMessage statsRef cp ct + update statsRef (RollBackward cp ct) = do + modifyIORef' statsRef $ \stats -> + stats {syncStatsNumRollbacks = syncStatsNumRollbacks stats + 1} + printMessage statsRef cp ct + + printMessage statsRef cp ct = do + SyncStats {syncStatsNumBlocks, syncStatsNumRollbacks, syncStatsLastMessage} <- readIORef statsRef + + now <- getCurrentTime + + let timeSinceLastMsg = diffUTCTime now <$> syncStatsLastMessage + + let blocksMsg = case timeSinceLastMsg of + Nothing -> id + Just t -> \k -> + "Processed" + <+> pretty syncStatsNumBlocks + <+> "blocks in the last" + <+> pretty (formatTime defaultTimeLocale "%s" t) + <+> "seconds" + <+> let rate = fromIntegral syncStatsNumBlocks / realToFrac t :: Double + in pretty (printf "(%.0f blocks/sec)." rate :: String) + <+> k + + let rollbackMsg = case timeSinceLastMsg of + Nothing -> id + Just t -> \k -> + ( case syncStatsNumRollbacks of + 0 -> "No" + _ -> pretty syncStatsNumRollbacks + ) + <+> "rollbacks in the last" + <+> pretty (formatTime defaultTimeLocale "%s" t) + <+> "seconds." + <+> k + + let syncMsg = case (cp, ct) of + (ChainPoint (SlotNo chainPointSlot) _, ChainTip (SlotNo chainTipSlot) _header _blockNo) + -- TODO: MAGIC number here. Is there a better number? + -- 100 represents the number of slots before the + -- node where we consider the chain-index to be synced. + | chainTipSlot - chainPointSlot < 100 -> + "Synchronised." + (ChainPoint (SlotNo chainPointSlotNo) _, ChainTip (SlotNo chainTipSlotNo) _header _blockNo) -> + let pct = (100 :: Double) * fromIntegral chainPointSlotNo / fromIntegral chainTipSlotNo + in pretty + ( printf + "Synchronising. Current slot %d out of %d (%0.2f%%)." + chainPointSlotNo + chainTipSlotNo + pct :: + String + ) + _ -> "Starting." + + let shouldPrint = case timeSinceLastMsg of + Nothing -> True + Just t + | t > minSecondsBetweenMsg -> True + | otherwise -> False + + when shouldPrint $ do + logInfo tracer $ + renderStrict $ + layoutPretty defaultLayoutOptions $ + syncMsg <+> (blocksMsg $ rollbackMsg $ "Last block processed" <+> pretty cp <> ".") + modifyIORef' statsRef $ \stats -> + stats + { syncStatsNumBlocks = 0, + syncStatsNumRollbacks = 0, + syncStatsLastMessage = Just now + } diff --git a/plutus-chain-index/app/Marconi/Orphans.hs b/plutus-chain-index/app/Marconi/Orphans.hs new file mode 100644 index 0000000000..996064f606 --- /dev/null +++ b/plutus-chain-index/app/Marconi/Orphans.hs @@ -0,0 +1,26 @@ +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE OverloadedStrings #-} +{-# OPTIONS_GHC -Wno-orphans #-} + +module Marconi.Orphans where + +import Cardano.Api (BlockHeader, BlockNo (BlockNo), ChainPoint (..), ChainTip (ChainTip, ChainTipAtGenesis), Hash, + SlotNo (SlotNo), serialiseToRawBytesHexText) +import Prettyprinter (Pretty (pretty), (<+>)) + +instance Pretty ChainTip where + pretty ChainTipAtGenesis = "ChainTipAtGenesis" + pretty (ChainTip sn ha bn) = "ChainTip(" <> pretty sn <> "," <+> pretty ha <> "," <+> pretty bn <> ")" + +instance Pretty ChainPoint where + pretty ChainPointAtGenesis = "ChainPointAtGenesis" + pretty (ChainPoint sn ha) = "ChainPoint(" <> pretty sn <> "," <+> pretty ha <> ")" + +instance Pretty (Hash BlockHeader) where + pretty hash = "BlockHash" <+> pretty (serialiseToRawBytesHexText hash) + +instance Pretty SlotNo where + pretty (SlotNo n) = "Slot" <+> pretty n + +instance Pretty BlockNo where + pretty (BlockNo bn) = "BlockNo" <+> pretty bn diff --git a/plutus-chain-index/plutus-chain-index.cabal b/plutus-chain-index/plutus-chain-index.cabal index 8ff2322405..5e1b7b98c2 100644 --- a/plutus-chain-index/plutus-chain-index.cabal +++ b/plutus-chain-index/plutus-chain-index.cabal @@ -37,40 +37,35 @@ library Plutus.ChainIndex.Lib Plutus.ChainIndex.Logging Plutus.ChainIndex.SyncStats - Marconi.Index.Datum other-modules: Control.Concurrent.STM.TBMQueue hs-source-dirs: src build-depends: - plutus-ledger -any, - plutus-ledger-api -any, - plutus-chain-index-core -any, - plutus-script-utils -any, - freer-extras -any + plutus-ledger, + plutus-ledger-api, + plutus-chain-index-core, + freer-extras build-depends: - aeson -any, - async -any, + aeson, + async, base >=4.7 && <5, - beam-sqlite -any, - beam-migrate -any, - cardano-api -any, - contra-tracer -any, - clock -any, - data-default -any, - freer-simple -any, - iohk-monitoring -any, - lens -any, - optparse-applicative -any, - ouroboros-network -any, + beam-sqlite, + beam-migrate, + cardano-api, + contra-tracer, + clock, + data-default, + freer-simple, + iohk-monitoring, + lens, + optparse-applicative, + ouroboros-network, prettyprinter >=1.1.0.1, - resource-pool -any, - sqlite-simple -any, - stm -any, - time-units -any, - yaml -any, - serialise -any, - hysterical-screams -any, - bytestring -any, + resource-pool, + sqlite-simple, + stm, + time-units, + yaml, executable plutus-chain-index main-is: Main.hs @@ -83,11 +78,15 @@ executable plutus-chain-index -Wno-missing-import-lists -Wredundant-constraints -O0 build-depends: base >=4.9 && <5, - plutus-chain-index -any + plutus-chain-index -executable plutus-indexer - main-is: Indexer.hs +executable marconi + main-is: Marconi.hs hs-source-dirs: app + other-modules: + Marconi.Index.Datum + Marconi.Logging + Marconi.Orphans default-language: Haskell2010 default-extensions: ImportQualifiedPost ghc-options: @@ -96,14 +95,21 @@ executable plutus-indexer -Wno-missing-import-lists -Wredundant-constraints -O0 build-depends: base >=4.9 && <5, - plutus-chain-index -any, - iohk-monitoring -any, - cardano-api -any, - plutus-ledger -any, - plutus-chain-index-core -any, - plutus-script-utils -any, - containers -any, - lens -any, - hysterical-screams -any, - text -any, - optparse-applicative -any, + bytestring, + cardano-api, + containers, + hysterical-screams, + iohk-monitoring, + lens, + optparse-applicative, + plutus-chain-index, + plutus-chain-index-core, + plutus-ledger, + plutus-script-utils, + plutus-streaming, + prettyprinter, + serialise, + sqlite-simple, + streaming, + text, + time, diff --git a/plutus-streaming/src/Plutus/Streaming.hs b/plutus-streaming/src/Plutus/Streaming.hs index 6acaa6eaa7..fb07a454d7 100644 --- a/plutus-streaming/src/Plutus/Streaming.hs +++ b/plutus-streaming/src/Plutus/Streaming.hs @@ -14,9 +14,9 @@ import Cardano.Api (BlockInMode, CardanoMode, ChainPoint, ChainSyncClient (Chain import Cardano.Api.ChainSync.Client (ClientStIdle (SendMsgFindIntersect, SendMsgRequestNext), ClientStIntersect (ClientStIntersect, recvMsgIntersectFound, recvMsgIntersectNotFound), ClientStNext (ClientStNext, recvMsgRollBackward, recvMsgRollForward)) -import Control.Concurrent.Async (link, withAsync) +import Control.Concurrent.Async (ExceptionInLinkedThread (ExceptionInLinkedThread), link, withAsync) import Control.Concurrent.STM (TChan, atomically, dupTChan, newBroadcastTChanIO, readTChan, writeTChan) -import Control.Exception (Exception, throw) +import Control.Exception (Exception, SomeException (SomeException), catch, throw) import GHC.Generics (Generic) import Streaming (Of, Stream) import Streaming.Prelude qualified as S @@ -86,6 +86,9 @@ withChainSyncEventStream socketPath networkId point consumer = do link a -- Run the consumer consumer $ S.repeatM $ atomically (readTChan readerChannel) + -- Let's rethrow exceptions from the client thread unwrapped, so that the + -- consumer does not have to know anything about async + `catch` \(ExceptionInLinkedThread _ (SomeException e)) -> throw e -- | `chainSyncStreamingClient` is the client that connects to a local node -- and runs the chain-sync mini-protocol. This client is fire-and-forget