Skip to content
This repository has been archived by the owner on Dec 2, 2024. It is now read-only.

Commit

Permalink
PLT-255 Fixed plutus-pab integration test (#496)
Browse files Browse the repository at this point in the history
* PLT-255 Fixed plutus-pab integration test

* Reorganized the placement of some functions

* Fixed the `awaitSlot` blockchain action in `plutus-pab` as it wasn't
  working. It was using as reference the slot of the last synced block
  instead of the actual current slot.

* Patched temporarly the PubKey contract so that it waits a bit before
  querying the chain-index. In the future, once we fix the discrepancy
  between information shared between plutus-pab and plutus-chain-index,
  this should be removed.

* Fixed the number of events that are processed by plutus-chain-index
  once we are in sync with the local node. The reason is that once we
  are fully in sync with the local node, we want to process as much
  information as possible and not wait for the queue to be filled.

* PLT-255 Fixed plutus-pab integration test

* Reorganized the placement of some functions

* Fixed the `awaitSlot` blockchain action in `plutus-pab` as it wasn't
  working. It was using as reference the slot of the last synced block
  instead of the actual current slot.

* Patched temporarly the PubKey contract so that it waits a bit before
  querying the chain-index. In the future, once we fix the discrepancy
  between information shared between plutus-pab and plutus-chain-index,
  this should be removed.

* Fixed the number of events that are processed by plutus-chain-index
  once we are in sync with the local node. The reason is that once we
  are fully in sync with the local node, we want to process as much
  information as possible and not wait for the queue to be filled.
  • Loading branch information
koslambrou committed Jun 22, 2022
1 parent 0c3c5c6 commit e322e49
Show file tree
Hide file tree
Showing 13 changed files with 233 additions and 90 deletions.
5 changes: 3 additions & 2 deletions plutus-chain-index/src/Plutus/ChainIndex/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import Plutus.ChainIndex.CommandLine (AppConfig (AppConfig, acCLIConfigOverrides
applyOverrides, cmdWithHelpParser)
import Plutus.ChainIndex.Compatibility (fromCardanoBlockNo)
import Plutus.ChainIndex.Config qualified as Config
import Plutus.ChainIndex.Events (measureEventByTxs, processEventsQueue)
import Plutus.ChainIndex.Events (measureEventQueueSizeByTxs, processEventsQueue)
import Plutus.ChainIndex.Lib (getTipSlot, storeChainSyncHandler, storeFromBlockNo, syncChainIndex, withRunRequirements)
import Plutus.ChainIndex.Logging qualified as Logging
import Plutus.ChainIndex.Server qualified as Server
Expand Down Expand Up @@ -82,7 +82,8 @@ runMainWithLog logger logConfig config = do
logger slotNoStr

-- Queue for processing events
eventsQueue <- newTBMQueueIO (Config.cicAppendTransactionQueueSize config) measureEventByTxs
let maxQueueSize = Config.cicAppendTransactionQueueSize config
eventsQueue <- newTBMQueueIO maxQueueSize (measureEventQueueSizeByTxs maxQueueSize)
syncHandler
<- storeChainSyncHandler eventsQueue
& storeFromBlockNo (fromCardanoBlockNo $ Config.cicStoreFrom config)
Expand Down
49 changes: 29 additions & 20 deletions plutus-chain-index/src/Plutus/ChainIndex/Events.hs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
Expand All @@ -21,7 +20,8 @@ import Numeric.Natural (Natural)
import Plutus.ChainIndex qualified as CI
import Plutus.ChainIndex.Lib (ChainSyncEvent (Resume, RollBackward, RollForward), EventsQueue, RunRequirements,
runChainIndexDuringSync)
import Plutus.ChainIndex.SyncStats (SyncLog, logProgress)
import Plutus.ChainIndex.SyncStats (SyncLog, getSyncState, isSyncStateSynced, logProgress)
import Plutus.ChainIndex.Types (tipAsPoint)
import Plutus.Monitoring.Util (PrettyObject (PrettyObject), convertLog, runLogEffects)
import System.Clock (Clock (Monotonic), TimeSpec, diffTimeSpec, getTime)

Expand All @@ -33,13 +33,21 @@ period = 2_000_000 -- 2s
-- By doing this we accumulate some number of blocks but with less than 'queueSize' number of transactions.
-- This approach helps to process blocks with a constant memory usage.
--
-- However, once we are in sync with the node, we want to process every block
-- instead of batches of blocks so that we can update the database as frequently
-- as possible.
--
-- Just accumulating 'queueSize' blocks doesn't work as a block can have any number of transactions.
-- It works fine at the beginning of the chain but later blocks grow in their size and the memory
-- usage grows tremendously.
measureEventByTxs :: ChainSyncEvent -> Natural
measureEventByTxs = \case
(RollForward (CI.Block _ transactions) _) -> fromIntegral $ length transactions
_ -> 1
measureEventQueueSizeByTxs :: Natural -> ChainSyncEvent -> Natural
measureEventQueueSizeByTxs maxQueueSize (RollForward (CI.Block syncTip transactions) nodeTip) =
let syncState = getSyncState (tipAsPoint syncTip) (tipAsPoint nodeTip)
txLen = fromIntegral $ length transactions
in if isSyncStateSynced syncState
then maxQueueSize + 1
else txLen
measureEventQueueSizeByTxs maxQueueSize _ = maxQueueSize + 1 -- to handle resume and rollback asap

-- | 'processEventsQueue' reads events from 'TBQueue', collects enough 'RollForward's to
-- append blocks at once.
Expand All @@ -56,17 +64,18 @@ processEventsQueue trace runReq eventsQueue = forever $ do
processEvents start eventsToProcess
where
processEvents :: TimeSpec -> [ChainSyncEvent] -> IO ()
processEvents start events = case events of
(Resume resumePoint) : (RollBackward backwardPoint _) : restEvents -> do
void $ runChainIndexDuringSync runReq $ do
CI.rollback backwardPoint
CI.resumeSync resumePoint
processEvents start restEvents
rollForwardEvents -> do
let
blocks = catMaybes $ rollForwardEvents <&> \case
(RollForward block _) -> Just block
_ -> Nothing
void $ runChainIndexDuringSync runReq $ CI.appendBlocks blocks
end <- getTime Monotonic
void $ runLogEffects (convertLog PrettyObject trace) $ logProgress events (diffTimeSpec end start)
processEvents start events =
case events of
(Resume resumePoint) : (RollBackward backwardPoint _) : restEvents -> do
void $ runChainIndexDuringSync runReq $ do
CI.rollback backwardPoint
CI.resumeSync resumePoint
processEvents start restEvents
rollForwardEvents -> do
let
blocks = catMaybes $ rollForwardEvents <&> \case
(RollForward block _) -> Just block
_ -> Nothing
void $ runChainIndexDuringSync runReq $ CI.appendBlocks blocks
end <- getTime Monotonic
void $ runLogEffects (convertLog PrettyObject trace) $ logProgress events (diffTimeSpec end start)
19 changes: 17 additions & 2 deletions plutus-chain-index/src/Plutus/ChainIndex/SyncStats.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}

Expand Down Expand Up @@ -70,6 +69,18 @@ data SyncState = Synced | Syncing Double | NotSyncing
deriving stock (Eq, Show, Generic)
deriving anyclass (FromJSON, ToJSON, ToObject)

isSyncStateSynced :: SyncState -> Bool
isSyncStateSynced Synced = True
isSyncStateSynced _ = False

isSyncStateNotSyncing :: SyncState -> Bool
isSyncStateNotSyncing NotSyncing = True
isSyncStateNotSyncing _ = False

isSyncStateSyncing :: SyncState -> Bool
isSyncStateSyncing (Syncing _) = True
isSyncStateSyncing _ = False

instance Pretty SyncState where
pretty = \case
Synced -> "Still in sync."
Expand All @@ -94,7 +105,11 @@ logProgress events period = do
-- Find a better way to calculate this percentage.
getSyncStateFromStats :: SyncStats -> SyncState
getSyncStateFromStats (SyncStats _ _ chainSyncPoint nodePoint) =
case (chainSyncPoint, nodePoint) of
getSyncState chainSyncPoint nodePoint

getSyncState :: CI.Point -> CI.Point -> SyncState
getSyncState chainIndexSyncPoint nodePoint =
case (chainIndexSyncPoint, nodePoint) of
(_, PointAtGenesis) -> NotSyncing
(CI.PointAtGenesis, CI.Point _ _) -> Syncing 0
(CI.Point (Slot chainSyncSlot) _, CI.Point (Slot nodeSlot) _)
Expand Down
5 changes: 1 addition & 4 deletions plutus-pab-executables/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,8 @@ Can be used to run end-to-end tests using a private local testnet.

2. Get config data:

Clone <https://github.com/input-output-hk/cardano-wallet/> to $DIR and set the
`SHELLEY_TEST_DATA` environment variable:

```
export SHELLEY_TEST_DATA=$DIR/lib/shelley/test/data/cardano-node-shelley
export SHELLEY_TEST_DATA=plutus-pab/local-cluster/cluster-data/cardano-node-shelley
```

3. Run the local cluster:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeApplications #-}

module ContractExample.IntegrationTest(run) where

import Control.Lens (makeClassyPrisms)
import Data.Aeson (FromJSON, ToJSON)
import Data.Map qualified as Map
import GHC.Generics (Generic)
Expand All @@ -25,6 +27,10 @@ data IError =
deriving stock (Eq, Haskell.Show, Generic)
deriving anyclass (ToJSON, FromJSON)

makeClassyPrisms ''IError

instance AsContractError IError where
_ContractError = _CError

run :: Contract () EmptySchema IError ()
run = runError run' >>= \case
Expand All @@ -34,10 +40,9 @@ run = runError run' >>= \case
run' :: Contract () EmptySchema IError ()
run' = do
logInfo @Haskell.String "Starting integration test"
pkh <- mapError CError ownPaymentPubKeyHash
pkh <- ownPaymentPubKeyHash
(txOutRef, ciTxOut, pkInst) <- mapError PKError (PubKey.pubKeyContract pkh (Ada.adaValueOf 10))
logInfo @Haskell.String "pubKey contract complete:"
logInfo txOutRef
let lookups =
Constraints.otherData (Datum $ getRedeemer unitRedeemer)
<> Constraints.unspentOutputs (maybe mempty (Map.singleton txOutRef) ciTxOut)
Expand All @@ -53,5 +58,5 @@ run' = do
Right redeemingTx -> do
let txi = getCardanoTxId redeemingTx
logInfo @Haskell.String $ "Waiting for tx " <> show txi <> " to complete"
mapError CError $ awaitTxConfirmed txi
awaitTxConfirmed txi
logInfo @Haskell.String "Tx confirmed. Integration test complete."
8 changes: 6 additions & 2 deletions plutus-pab-executables/test/full/Plutus/PAB/Simulator/Test.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module Plutus.PAB.Simulator.Test(runSimulation) where

import Control.Monad.Freer (interpret)
import Data.Default (Default (def))
import Ledger.Params (Params (..), allowBigTransactions)
import Ledger.Params (Params (pSlotConfig), allowBigTransactions)
import Ledger.TimeSlot (SlotConfig (..))
import Plutus.PAB.Core (EffectHandlers)
import Plutus.PAB.Effects.Contract.Builtin (Builtin, BuiltinHandler (contractHandler), handleBuiltin)
Expand All @@ -25,6 +25,10 @@ runSimulation = runSimulationWith simulatorHandlers
-- | 'EffectHandlers' for running the PAB as a simulator (no connectivity to
-- out-of-process services such as wallet backend, node, etc.)
simulatorHandlers :: EffectHandlers (Builtin TestContracts) (SimulatorState (Builtin TestContracts))
simulatorHandlers = mkSimulatorHandlers (allowBigTransactions $ def { pSlotConfig = def { scSlotLength = 1 } }) handler where
simulatorHandlers = mkSimulatorHandlers params handler
where
params :: Params
params = allowBigTransactions $ def { pSlotConfig = def { scSlotLength = 1 } }

handler :: SimulatorContractHandler (Builtin TestContracts)
handler = interpret (contractHandler handleBuiltin)
10 changes: 8 additions & 2 deletions plutus-pab/src/Cardano/Node/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,15 @@ handleNodeClientClient params e = do
-- need to be sent via the wallet, not the mocked server node
-- (which is not actually running).
throwError TxSenderNotAvailable
Just handle -> liftIO $ onCardanoTx (MockClient.queueTx handle) (const $ error "Cardano.Node.Client: Expecting a mock tx, not an Alonzo tx when publishing it.") tx
Just handle ->
liftIO $
onCardanoTx (MockClient.queueTx handle)
(const $ error "Cardano.Node.Client: Expecting a mock tx, not an Alonzo tx when publishing it.")
tx
GetClientSlot ->
either (liftIO . MockClient.getCurrentSlot) (liftIO . Client.getCurrentSlot) chainSyncHandle
either (liftIO . MockClient.getCurrentSlot)
(liftIO . Client.getCurrentSlot)
chainSyncHandle
GetClientParams -> pure params

-- | This does not seem to support resuming so it means that the slot tick will
Expand Down
15 changes: 8 additions & 7 deletions plutus-pab/src/Plutus/PAB/Core.hs
Original file line number Diff line number Diff line change
Expand Up @@ -548,17 +548,19 @@ yieldedExportTxs instanceId = do

currentSlot :: forall t env. PABAction t env (STM Slot)
currentSlot = do
Instances.BlockchainEnv{Instances.beCurrentSlot} <- asks @(PABEnvironment t env) blockchainEnv
pure $ STM.readTVar beCurrentSlot
be <- asks @(PABEnvironment t env) blockchainEnv
pure $ Instances.currentSlot be

-- | Wait until the target slot number has been reached
-- | Wait until the target slot number has been reached relative to the current
-- slot.
waitUntilSlot :: forall t env. Slot -> PABAction t env ()
waitUntilSlot targetSlot = do
tx <- currentSlot
void $ liftIO $ STM.atomically $ do
s <- tx
guard (s >= targetSlot)

-- | Wait for a certain number of slots relative to the current slot.
waitNSlots :: forall t env. Int -> PABAction t env ()
waitNSlots i = do
current <- currentSlot >>= liftIO . STM.atomically
Expand Down Expand Up @@ -645,7 +647,7 @@ handleInstancesStateReader = \case
Ask -> asks @(PABEnvironment t env) instancesState

-- | Handle the 'TimeEffect' by reading the current slot number from
-- the blockchain env.
-- the blockchain env.
handleTimeEffect ::
forall t env m effs.
( Member (Reader (PABEnvironment t env)) effs
Expand All @@ -656,6 +658,5 @@ handleTimeEffect ::
~> Eff effs
handleTimeEffect = \case
SystemTime -> do
Instances.BlockchainEnv{Instances.beCurrentSlot} <- asks @(PABEnvironment t env) blockchainEnv
liftIO $ STM.readTVarIO beCurrentSlot

be <- asks @(PABEnvironment t env) blockchainEnv
liftIO $ STM.atomically $ Instances.currentSlot be
22 changes: 12 additions & 10 deletions plutus-pab/src/Plutus/PAB/Core/ContractInstance.hs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,17 @@ processAwaitSlotRequestsSTM =
maybeToHandler (extract Contract.Effects._AwaitSlotReq)
>>> (RequestHandler $ \targetSlot_ -> fmap AwaitSlotResp . InstanceState.awaitSlot targetSlot_ <$> ask)

processAwaitTimeRequestsSTM ::
forall effs.
( Member (Reader BlockchainEnv) effs
)
=> RequestHandler effs PABReq (STM PABResp)
processAwaitTimeRequestsSTM =
maybeToHandler (extract Contract.Effects._AwaitTimeReq) >>>
(RequestHandler $ \time ->
fmap AwaitTimeResp . InstanceState.awaitTime time <$> ask
)

processTxStatusChangeRequestsSTM ::
forall effs.
( Member (Reader BlockchainEnv) effs
Expand Down Expand Up @@ -235,15 +246,6 @@ processEndpointRequestsSTM =
maybeToHandler (traverse (extract Contract.Effects._ExposeEndpointReq))
>>> (RequestHandler $ \q@Request{rqID, itID, rqRequest} -> fmap (Response rqID itID) (fmap (ExposeEndpointResp (aeDescription rqRequest)) . InstanceState.awaitEndpointResponse q <$> ask))

processAwaitTimeRequestsSTM ::
forall effs.
( Member (Reader BlockchainEnv) effs
)
=> RequestHandler effs PABReq (STM PABResp)
processAwaitTimeRequestsSTM =
maybeToHandler (extract Contract.Effects._AwaitTimeReq)
>>> (RequestHandler $ \time -> fmap AwaitTimeResp . InstanceState.awaitTime time <$> ask)

-- | 'RequestHandler' that uses TVars to wait for events
stmRequestHandler ::
forall effs.
Expand Down Expand Up @@ -274,10 +276,10 @@ stmRequestHandler = fmap sequence (wrapHandler (fmap pure nonBlockingRequests) <
-- requests that wait for changes to happen
blockingRequests =
wrapHandler (processAwaitSlotRequestsSTM @effs)
<> wrapHandler (processAwaitTimeRequestsSTM @effs)
<> wrapHandler (processTxStatusChangeRequestsSTM @effs)
<> wrapHandler (processTxOutStatusChangeRequestsSTM @effs)
<> processEndpointRequestsSTM @effs
<> wrapHandler (processAwaitTimeRequestsSTM @effs)
<> processUtxoSpentRequestsSTM @effs
<> processUtxoProducedRequestsSTM @effs

Expand Down
Loading

0 comments on commit e322e49

Please sign in to comment.