Skip to content

Commit

Permalink
Merge #4239
Browse files Browse the repository at this point in the history
4239: tx-generator: fix a bug r=MarcFontaine a=MarcFontaine

The bug was introduced in :  #3815.
The number of total output transactions of the tx-generator was used for the number of outputs of a single transaction.
It was not found because the tests were only run for Plutus workloads and the bug was in a non-Plutus code-branch.

Co-authored-by: MarcFontaine <[email protected]>
  • Loading branch information
iohk-bors[bot] and MarcFontaine authored Jul 29, 2022
2 parents a9bc2f9 + be275eb commit aed8e71
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,13 @@ txSubmissionClient tr bmtr initialTxSource endOfProtocolCallback =
fail (T.unpack err)
let (stillUnacked, acked) = L.splitAtEnd ack unAcked
let newStats = stats { stsAcked = stsAcked stats + Ack ack }
traceWith bmtr $ TraceBenchTxSubServAck (getTxId . getTxBody <$> acked)
traceWith bmtr $ SubmissionClientDiscardAcknowledged (getTxId . getTxBody <$> acked)
return (txSource, UnAcked stillUnacked, newStats)

queueNewTxs :: [tx] -> LocalState era -> LocalState era
queueNewTxs newTxs (txSource, UnAcked unAcked, stats)
= (txSource, UnAcked (newTxs <> unAcked), stats)

-- Sadly, we can't just return what we want, we instead have to
-- communicate via IORefs, because..
-- The () return type is forced by Ouroboros.Network.NodeToNode.connectTo
client ::LocalState era -> ClientStIdle (GenTxId CardanoBlock) (GenTx CardanoBlock) m ()

client localState = ClientStIdle
Expand All @@ -140,13 +137,14 @@ txSubmissionClient tr bmtr initialTxSource endOfProtocolCallback =
req = Req $ fromIntegral reqNum
traceWith tr $ reqIdsTrace ack req blocking
stateA <- discardAcknowledged blocking ack state

traceWith bmtr $ TraceBenchTxSubDebug "return from discard"
(stateB, newTxs) <- produceNextTxs blocking req stateA
traceWith bmtr $ TraceBenchTxSubDebug "return from produceNext"
let stateC@(_, UnAcked outs , stats) = queueNewTxs newTxs stateB

traceWith tr $ idListTrace (ToAnnce newTxs) blocking
traceWith bmtr $ TraceBenchTxSubServAnn (getTxId . getTxBody <$> newTxs)
traceWith bmtr $ TraceBenchTxSubServOuts (getTxId . getTxBody <$> outs)
traceWith bmtr $ SubmissionClientReplyTxIds (getTxId . getTxBody <$> newTxs)
traceWith bmtr $ SubmissionClientUnAcked (getTxId . getTxBody <$> outs)

case blocking of
TokBlocking -> case NE.nonEmpty newTxs of
Expand Down Expand Up @@ -175,8 +173,8 @@ txSubmissionClient tr bmtr initialTxSource endOfProtocolCallback =
missIds = reqTxIds L.\\ uaIds

traceWith tr $ TxList (length toSend)
traceWith bmtr $ SubmissionClientUnAcked (getTxId . getTxBody <$> ua)
traceWith bmtr $ TraceBenchTxSubServReq reqTxIds
traceWith bmtr $ TraceBenchTxSubServOuts (getTxId . getTxBody <$> ua)
unless (L.null missIds) $
traceWith bmtr $ TraceBenchTxSubServUnav missIds
pure $ SendMsgReplyTxs (toGenTx <$> toSend)
Expand Down Expand Up @@ -213,10 +211,9 @@ txSubmissionClient tr bmtr initialTxSource endOfProtocolCallback =
reqIdsTrace :: Ack -> Req -> TokBlockingStyle a -> NodeToNodeSubmissionTrace
reqIdsTrace ack req = \case
TokBlocking -> ReqIdsBlocking ack req
TokNonBlocking -> ReqIdsPrompt ack req
TokNonBlocking -> ReqIdsNonBlocking ack req

idListTrace :: ToAnnce tx -> TokBlockingStyle a -> NodeToNodeSubmissionTrace
idListTrace (ToAnnce toAnn) = \case
TokBlocking -> IdsListBlocking $ length toAnn
TokNonBlocking -> IdsListPrompt $ length toAnn

TokNonBlocking -> IdsListNonBlocking $ length toAnn
10 changes: 5 additions & 5 deletions bench/tx-generator/src/Cardano/Benchmarking/LogTypes.hs
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,16 @@ data TraceBenchTxSubmit txid
| TraceBenchTxSubStart [txid]
-- ^ The @txid@ has been submitted to `TxSubmission`
-- protocol peer.
| TraceBenchTxSubServAnn [txid]
| SubmissionClientReplyTxIds [txid]
-- ^ Announcing txids in response for server's request.
| TraceBenchTxSubServReq [txid]
-- ^ Request for @tx@ received from `TxSubmission` protocol
-- peer.
| TraceBenchTxSubServAck [txid]
| SubmissionClientDiscardAcknowledged [txid]
-- ^ An ack (window moved over) received for these transactions.
| TraceBenchTxSubServDrop [txid]
-- ^ Transactions the server implicitly dropped.
| TraceBenchTxSubServOuts [txid]
| SubmissionClientUnAcked [txid]
-- ^ Transactions outstanding.
| TraceBenchTxSubServUnav [txid]
-- ^ Transactions requested, but unavailable in the outstanding set.
Expand Down Expand Up @@ -113,8 +113,8 @@ instance ToJSON SubmissionSummary
data NodeToNodeSubmissionTrace
= ReqIdsBlocking Ack Req
| IdsListBlocking Int
| ReqIdsPrompt Ack Req
| IdsListPrompt Int
| ReqIdsNonBlocking Ack Req
| IdsListNonBlocking Int
| ReqTxs Int
| TxList Int
| EndOfProtocol
Expand Down
3 changes: 1 addition & 2 deletions bench/tx-generator/src/Cardano/Benchmarking/Script/Core.hs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ runBenchmarkInEra sourceWallet submitMode (ThreadName threadName) shape tps era

let
inToOut :: [Lovelace] -> [Lovelace]
inToOut = FundSet.inputsToOutputsWithFee (auxFee shape) (auxOutputs shape)
inToOut = FundSet.inputsToOutputsWithFee (auxFee shape) (auxOutputsPerTx shape)

txGenerator = genTx protocolParameters (TxInsCollateralNone, []) (mkFee (auxFee shape)) metadata (KeyWitness KeyWitnessForSpending)

Expand Down Expand Up @@ -642,4 +642,3 @@ and for which the JSON encoding is "reserved".
reserved :: [String] -> ActionM ()
reserved _ = do
throwE $ UserError "no dirty hack is implemented"

37 changes: 23 additions & 14 deletions bench/tx-generator/src/Cardano/Benchmarking/TpsThrottle.hs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ data TpsThrottle = TpsThrottle {
startSending :: IO ()
, sendStop :: STM ()
, receiveBlocking :: STM Step
, receiveNoneBlocking :: STM (Maybe Step)
, receiveNonBlocking :: STM (Maybe Step)
}

-- TVar state ::
-- empty -> Block submission
-- Just n -> allow n transmissions
-- Just 0 -> illegal state
-- Just n -> allow n transmissions ( n must be >0 )
-- Nothing -> teminate transmission

newTpsThrottle :: Int -> Int -> TPSRate -> IO TpsThrottle
Expand All @@ -34,19 +35,16 @@ newTpsThrottle buffersize count tpsRate = do
startSending = sendNTicks tpsRate buffersize count var
, sendStop = putTMVar var Nothing
, receiveBlocking = takeTMVar var >>= receiveAction var
, receiveNoneBlocking = do
s <- tryTakeTMVar var
case s of
Nothing -> return Nothing
Just state -> Just <$> receiveAction var state
, receiveNonBlocking =
(Just <$> (takeTMVar var >>= receiveAction var )) `orElse` return Nothing
}

receiveAction :: TMVar (Maybe Int) -> Maybe Int -> STM Step
receiveAction var state = case state of
Nothing -> do
putTMVar var Nothing
return Stop
Just 1 -> return Next -- leave var empty
Just 1 -> return Next -- leave var empty, i.e. block submission until sendNTicks unblocks
Just n -> do
-- decrease counter and let other threads transmit
putTMVar var $ Just $ pred n
Expand All @@ -59,15 +57,15 @@ sendNTicks (TPSRate rate) buffersize count var = do
where
worker 0 _ _ = return ()
worker n lastPreDelay lastDelay = do
atomically increaseWatermark
increaseWatermark
now <- Clock.getCurrentTime
let targetDelay = realToFrac $ 1.0 / rate
loopCost = (now `Clock.diffUTCTime` lastPreDelay) - lastDelay
delay = targetDelay - loopCost
threadDelay . ceiling $ (realToFrac delay * 1000000.0 :: Double)
worker (pred n) now delay
-- increaseWatermark can retry/block if there are already buffersize ticks in the "queue"
increaseWatermark = do
increaseWatermark = atomically $ do
s <- tryTakeTMVar var
case s of
Nothing -> putTMVar var $ Just 1
Expand All @@ -90,19 +88,18 @@ consumeTxsNonBlocking tpsThrottle req
= if req==0
then pure (Next, 0)
else do
STM.atomically (receiveNoneBlocking tpsThrottle) >>= \case
STM.atomically (receiveNonBlocking tpsThrottle) >>= \case
Nothing -> pure (Next, 0)
Just Stop -> pure (Stop, 0)
Just Next -> pure (Next, 1)



test :: IO ()
test = do
t <- newTpsThrottle 10 50 2
_threadId <- startThrottle t
threadDelay 5000000
forM_ [1 .. 5] $ \i -> forkIO $ consumer t i
forM_ [6 .. 7] $ \i -> forkIO $ consumer2 t i
putStrLn "done"
where
startThrottle t = forkIO $ do
Expand All @@ -114,4 +111,16 @@ test = do
consumer t n = do
s <- atomically $ receiveBlocking t
print (n, s)
if s==Next then consumer t n else putStrLn $ "Done " ++ show n
if s == Next then consumer t n else putStrLn $ "Done " ++ show n

consumer2 :: TpsThrottle -> Int -> IO ()
consumer2 t n = do
r <- atomically $ receiveNonBlocking t
case r of
Just s -> do
print (n, s)
if s == Next then consumer2 t n else putStrLn $ "Done " ++ show n
Nothing -> do
putStrLn $ "wait " ++ show n
threadDelay 100000
consumer2 t n
20 changes: 10 additions & 10 deletions bench/tx-generator/src/Cardano/Benchmarking/Tracer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -129,24 +129,24 @@ instance LogFormatting (TraceBenchTxSubmit TxId) where
mconcat [ "kind" .= A.String "TraceBenchTxSubStart"
, "txIds" .= toJSON txIds
]
TraceBenchTxSubServAnn txIds ->
mconcat [ "kind" .= A.String "TraceBenchTxSubServAnn"
SubmissionClientReplyTxIds txIds ->
mconcat [ "kind" .= A.String "SubmissionClientReplyTxIds"
, "txIds" .= toJSON txIds
]
TraceBenchTxSubServReq txIds ->
mconcat [ "kind" .= A.String "TraceBenchTxSubServReq"
, "txIds" .= toJSON txIds
]
TraceBenchTxSubServAck txIds ->
mconcat [ "kind" .= A.String "TraceBenchTxSubServAck"
SubmissionClientDiscardAcknowledged txIds ->
mconcat [ "kind" .= A.String "SubmissionClientDiscardAcknowledged"
, "txIds" .= toJSON txIds
]
TraceBenchTxSubServDrop txIds ->
mconcat [ "kind" .= A.String "TraceBenchTxSubServDrop"
, "txIds" .= toJSON txIds
]
TraceBenchTxSubServOuts txIds ->
mconcat [ "kind" .= A.String "TraceBenchTxSubServOuts"
SubmissionClientUnAcked txIds ->
mconcat [ "kind" .= A.String "SubmissionClientUnAcked"
, "txIds" .= toJSON txIds
]
TraceBenchTxSubServUnav txIds ->
Expand Down Expand Up @@ -196,12 +196,12 @@ instance LogFormatting NodeToNodeSubmissionTrace where
IdsListBlocking sent -> KeyMap.fromList
[ "kind" .= A.String "IdsListBlocking"
, "sent" .= A.toJSON sent ]
ReqIdsPrompt (Ack ack) (Req req) -> KeyMap.fromList
[ "kind" .= A.String "ReqIdsPrompt"
ReqIdsNonBlocking (Ack ack) (Req req) -> KeyMap.fromList
[ "kind" .= A.String "ReqIdsNonBlocking"
, "ack" .= A.toJSON ack
, "req" .= A.toJSON req ]
IdsListPrompt sent -> KeyMap.fromList
[ "kind" .= A.String "IdsListPrompt"
IdsListNonBlocking sent -> KeyMap.fromList
[ "kind" .= A.String "IdsListNonBlocking"
, "sent" .= A.toJSON sent ]
EndOfProtocol -> KeyMap.fromList [ "kind" .= A.String "EndOfProtocol" ]
ReqTxs req -> KeyMap.fromList
Expand Down

0 comments on commit aed8e71

Please sign in to comment.