Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cardano-tracer: refactoring #4233

Merged
merged 1 commit into from
Jul 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 58 additions & 11 deletions cardano-tracer/bench/cardano-tracer-bench.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,38 +9,85 @@ import System.Directory (getTemporaryDirectory, removePathForcibly)

import Cardano.Logging hiding (LocalSocket)

import Cardano.Tracer.Handlers.RTView.Run
import Cardano.Tracer.Handlers.RTView.State.Historical
import Cardano.Tracer.Utils

import Cardano.Tracer.Configuration
import Cardano.Tracer.Handlers.Logs.TraceObjects (traceObjectsHandler)
import Cardano.Tracer.Handlers.RTView.State.TraceObjects (initSavedTraceObjects)
import Cardano.Tracer.Types (NodeId (..))
import Cardano.Tracer.Environment
import Cardano.Tracer.Handlers.Logs.TraceObjects
import Cardano.Tracer.Types

main :: IO ()
main = do
tmpDir <- getTemporaryDirectory
let root = tmpDir </> "cardano-tracer-bench"
c1 = mkConfig root ForHuman
c2 = mkConfig root ForMachine
lock <- newLock

to10 <- generate 10
to100 <- generate 100
to1000 <- generate 1000

savedTO <- initSavedTraceObjects
connectedNodes <- initConnectedNodes
acceptedMetrics <- initAcceptedMetrics
savedTO <- initSavedTraceObjects

chainHistory <- initBlockchainHistory
resourcesHistory <- initResourcesHistory
txHistory <- initTransactionsHistory

protocolsBrake <- initProtocolsBrake
dpRequestors <- initDataPointRequestors

currentLogLock <- newLock
currentDPLock <- newLock
eventsQueues <- initEventsQueues dpRequestors currentDPLock

let te1 =
TracerEnv
{ teConfig = c1
, teConnectedNodes = connectedNodes
, teAcceptedMetrics = acceptedMetrics
, teSavedTO = savedTO
, teBlockchainHistory = chainHistory
, teResourcesHistory = resourcesHistory
, teTxHistory = txHistory
, teCurrentLogLock = currentLogLock
, teCurrentDPLock = currentDPLock
, teEventsQueues = eventsQueues
, teDPRequestors = dpRequestors
, teProtocolsBrake = protocolsBrake
}
te2 =
TracerEnv
{ teConfig = c2
, teConnectedNodes = connectedNodes
, teAcceptedMetrics = acceptedMetrics
, teSavedTO = savedTO
, teBlockchainHistory = chainHistory
, teResourcesHistory = resourcesHistory
, teTxHistory = txHistory
, teCurrentLogLock = currentLogLock
, teCurrentDPLock = currentDPLock
, teEventsQueues = eventsQueues
, teDPRequestors = dpRequestors
, teProtocolsBrake = protocolsBrake
}

removePathForcibly root

defaultMain
[ bgroup "cardano-tracer"
[ -- 10 'TraceObject's per request.
bench "Handle TraceObjects LOG, 10" $ whnfIO $ traceObjectsHandler c1 nId lock savedTO to10
, bench "Handle TraceObjects JSON, 10" $ whnfIO $ traceObjectsHandler c2 nId lock savedTO to10
bench "Handle TraceObjects LOG, 10" $ whnfIO $ traceObjectsHandler te1 nId to10
, bench "Handle TraceObjects JSON, 10" $ whnfIO $ traceObjectsHandler te2 nId to10
-- 100 'TraceObject's per request.
, bench "Handle TraceObjects LOG, 100" $ whnfIO $ traceObjectsHandler c1 nId lock savedTO to100
, bench "Handle TraceObjects JSON, 100" $ whnfIO $ traceObjectsHandler c2 nId lock savedTO to100
, bench "Handle TraceObjects LOG, 100" $ whnfIO $ traceObjectsHandler te1 nId to100
, bench "Handle TraceObjects JSON, 100" $ whnfIO $ traceObjectsHandler te2 nId to100
-- 1000 'TraceObject's per request.
, bench "Handle TraceObjects LOG, 1000" $ whnfIO $ traceObjectsHandler c1 nId lock savedTO to1000
, bench "Handle TraceObjects JSON, 1000" $ whnfIO $ traceObjectsHandler c2 nId lock savedTO to1000
, bench "Handle TraceObjects LOG, 1000" $ whnfIO $ traceObjectsHandler te1 nId to1000
, bench "Handle TraceObjects JSON, 1000" $ whnfIO $ traceObjectsHandler te2 nId to1000
]
]
where
Expand Down
3 changes: 3 additions & 0 deletions cardano-tracer/cardano-tracer.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,11 @@ library
Cardano.Tracer.Handlers.RTView.Update.Transactions
Cardano.Tracer.Handlers.RTView.Update.Utils

Cardano.Tracer.Handlers.RTView.Utils

Cardano.Tracer.CLI
Cardano.Tracer.Configuration
Cardano.Tracer.Environment
Cardano.Tracer.Run
Cardano.Tracer.Types
Cardano.Tracer.Utils
Expand Down
72 changes: 28 additions & 44 deletions cardano-tracer/src/Cardano/Tracer/Acceptors/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ module Cardano.Tracer.Acceptors.Client
) where

import Codec.CBOR.Term (Term)
import Control.Concurrent.Extra (Lock)
import qualified Data.ByteString.Lazy as LBS
import Data.Void (Void)
import Data.Word (Word32)
Expand Down Expand Up @@ -39,42 +38,31 @@ import Trace.Forward.Run.TraceObject.Acceptor (acceptTraceObjectsInit)
import Cardano.Tracer.Acceptors.Utils (notifyAboutNodeDisconnected,
prepareDataPointRequestor, prepareMetricsStores, removeDisconnectedNode)
import qualified Cardano.Tracer.Configuration as TC
import Cardano.Tracer.Environment
import Cardano.Tracer.Handlers.Logs.TraceObjects (traceObjectsHandler)
import Cardano.Tracer.Handlers.RTView.Notifications.Types
import Cardano.Tracer.Handlers.RTView.Run (SavedTraceObjects)
import Cardano.Tracer.Types (AcceptedMetrics, ConnectedNodes, DataPointRequestors)
import Cardano.Tracer.Utils (connIdToNodeId)

runAcceptorsClient
:: TC.TracerConfig
:: TracerEnv
-> FilePath
-> ( EKGF.AcceptorConfiguration
, TF.AcceptorConfiguration TraceObject
, DPF.AcceptorConfiguration
)
-> ConnectedNodes
-> AcceptedMetrics
-> SavedTraceObjects
-> DataPointRequestors
-> Lock
-> EventsQueues
-> IO ()
runAcceptorsClient config p (ekgConfig, tfConfig, dpfConfig)
connectedNodes acceptedMetrics savedTO
dpRequestors currentLogLock eventsQueues =
withIOManager $ \iocp ->
doConnectToForwarder
(localSnocket iocp)
(localAddressFromPath p)
(TC.networkMagic config)
noTimeLimitsHandshake $
-- Please note that we always run all the supported protocols,
-- there is no mechanism to disable some of them.
appInitiator
[ (runEKGAcceptorInit ekgConfig connectedNodes acceptedMetrics errorHandler, 1)
, (runTraceObjectsAcceptorInit config tfConfig currentLogLock savedTO errorHandler, 2)
, (runDataPointsAcceptorInit dpfConfig connectedNodes dpRequestors errorHandler, 3)
]
runAcceptorsClient tracerEnv p (ekgConfig, tfConfig, dpfConfig) = withIOManager $ \iocp ->
doConnectToForwarder
(localSnocket iocp)
(localAddressFromPath p)
(TC.networkMagic $ teConfig tracerEnv)
noTimeLimitsHandshake $
-- Please note that we always run all the supported protocols,
-- there is no mechanism to disable some of them.
appInitiator
[ (runEKGAcceptorInit tracerEnv ekgConfig errorHandler, 1)
, (runTraceObjectsAcceptorInit tracerEnv tfConfig errorHandler, 2)
, (runDataPointsAcceptorInit tracerEnv dpfConfig errorHandler, 3)
]
where
appInitiator protocolsWithNums =
OuroborosApplication $ \connectionId _shouldStopSTM ->
Expand All @@ -86,8 +74,8 @@ runAcceptorsClient config p (ekgConfig, tfConfig, dpfConfig)
| (protocol, num) <- protocolsWithNums
]
errorHandler connId = do
removeDisconnectedNode connectedNodes acceptedMetrics dpRequestors connId
notifyAboutNodeDisconnected eventsQueues connId
removeDisconnectedNode tracerEnv connId
notifyAboutNodeDisconnected tracerEnv connId

doConnectToForwarder
:: Snocket IO LocalSocket LocalAddress
Expand All @@ -113,41 +101,37 @@ doConnectToForwarder snocket address netMagic timeLimits app =
address

runEKGAcceptorInit
:: EKGF.AcceptorConfiguration
-> ConnectedNodes
-> AcceptedMetrics
:: TracerEnv
-> EKGF.AcceptorConfiguration
-> (ConnectionId LocalAddress -> IO ())
-> ConnectionId LocalAddress
-> RunMiniProtocol 'InitiatorMode LBS.ByteString IO () Void
runEKGAcceptorInit ekgConfig connectedNodes acceptedMetrics errorHandler connId =
runEKGAcceptorInit tracerEnv ekgConfig errorHandler connId =
acceptEKGMetricsInit
ekgConfig
(prepareMetricsStores connectedNodes acceptedMetrics connId)
(prepareMetricsStores tracerEnv connId)
(errorHandler connId)

runTraceObjectsAcceptorInit
:: TC.TracerConfig
:: TracerEnv
-> TF.AcceptorConfiguration TraceObject
-> Lock
-> SavedTraceObjects
-> (ConnectionId LocalAddress -> IO ())
-> ConnectionId LocalAddress
-> RunMiniProtocol 'InitiatorMode LBS.ByteString IO () Void
runTraceObjectsAcceptorInit config tfConfig currentLogLock savedTO errorHandler connId =
runTraceObjectsAcceptorInit tracerEnv tfConfig errorHandler connId =
acceptTraceObjectsInit
tfConfig
(traceObjectsHandler config (connIdToNodeId connId) currentLogLock savedTO)
(traceObjectsHandler tracerEnv $ connIdToNodeId connId)
(errorHandler connId)

runDataPointsAcceptorInit
:: DPF.AcceptorConfiguration
-> ConnectedNodes
-> DataPointRequestors
:: TracerEnv
-> DPF.AcceptorConfiguration
-> (ConnectionId LocalAddress -> IO ())
-> ConnectionId LocalAddress
-> RunMiniProtocol 'InitiatorMode LBS.ByteString IO () Void
runDataPointsAcceptorInit dpfConfig connectedNodes dpRequestors errorHandler connId =
runDataPointsAcceptorInit tracerEnv dpfConfig errorHandler connId =
acceptDataPointsInit
dpfConfig
(prepareDataPointRequestor connectedNodes dpRequestors connId)
(prepareDataPointRequestor tracerEnv connId)
(errorHandler connId)
49 changes: 17 additions & 32 deletions cardano-tracer/src/Cardano/Tracer/Acceptors/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ module Cardano.Tracer.Acceptors.Run
) where

import Control.Concurrent.Async (forConcurrently_)
import Control.Concurrent.Extra (Lock)
import "contra-tracer" Control.Tracer (Tracer, contramap, nullTracer,
stdoutTracer)
import qualified Data.List.NonEmpty as NE
Expand All @@ -20,71 +19,57 @@ import qualified Trace.Forward.Configuration.DataPoint as DPF
import qualified Trace.Forward.Configuration.TraceObject as TOF
import qualified Trace.Forward.Protocol.TraceObject.Type as TOF

import Cardano.Tracer.Acceptors.Client (runAcceptorsClient)
import Cardano.Tracer.Acceptors.Server (runAcceptorsServer)
import Cardano.Tracer.Acceptors.Client
import Cardano.Tracer.Acceptors.Server
import Cardano.Tracer.Configuration
import Cardano.Tracer.Handlers.RTView.Notifications.Types
import Cardano.Tracer.Handlers.RTView.Run (SavedTraceObjects)
import Cardano.Tracer.Types (AcceptedMetrics, ConnectedNodes,
DataPointRequestors, ProtocolsBrake)
import Cardano.Tracer.Utils (runInLoop)
import Cardano.Tracer.Environment
import Cardano.Tracer.Utils

-- | Run acceptors for all supported protocols.
--
-- There are two "network modes" for acceptors:
-- 1. Server mode, when the tracer accepts connections from any number of nodes.
-- 2. Client mode, when the tracer initiates connections to specified number of nodes.
runAcceptors
:: TracerConfig
-> ConnectedNodes
-> AcceptedMetrics
-> SavedTraceObjects
-> DataPointRequestors
-> ProtocolsBrake
-> Lock
-> EventsQueues
-> IO ()
runAcceptors c@TracerConfig{network, ekgRequestFreq, loRequestNum, verbosity}
connectedNodes acceptedMetrics savedTO dpRequestors stopIt
currentLogLock eventsQueues =
runAcceptors :: TracerEnv -> IO ()
runAcceptors tracerEnv =
case network of
AcceptAt (LocalSocket p) ->
-- Run one server that accepts connections from the nodes.
runInLoop
(runAcceptorsServer c p (acceptorsConfigs p) connectedNodes
acceptedMetrics savedTO dpRequestors
currentLogLock eventsQueues)
verbosity p 1
(runAcceptorsServer tracerEnv p $ acceptorsConfigs p)
verbosity p initialPauseInSec
ConnectTo localSocks ->
-- Run N clients that initiate connections to the nodes.
forConcurrently_ (NE.nub localSocks) $ \(LocalSocket p) ->
runInLoop
(runAcceptorsClient c p (acceptorsConfigs p) connectedNodes
acceptedMetrics savedTO dpRequestors
currentLogLock eventsQueues)
verbosity p 1
(runAcceptorsClient tracerEnv p $ acceptorsConfigs p)
verbosity p initialPauseInSec
where
TracerConfig{network, ekgRequestFreq, loRequestNum, verbosity} = teConfig tracerEnv

acceptorsConfigs p =
( EKGF.AcceptorConfiguration
{ EKGF.acceptorTracer = mkVerbosity verbosity
, EKGF.forwarderEndpoint = EKGF.LocalPipe p
, EKGF.requestFrequency = secondsToNominalDiffTime $ fromMaybe 1.0 ekgRequestFreq
, EKGF.whatToRequest = EKGF.GetAllMetrics
, EKGF.shouldWeStop = stopIt
, EKGF.shouldWeStop = teProtocolsBrake tracerEnv
}
, TOF.AcceptorConfiguration
{ TOF.acceptorTracer = mkVerbosity verbosity
, TOF.forwarderEndpoint = p
, TOF.whatToRequest = TOF.NumberOfTraceObjects $ fromMaybe 100 loRequestNum
, TOF.shouldWeStop = stopIt
, TOF.shouldWeStop = teProtocolsBrake tracerEnv
}
, DPF.AcceptorConfiguration
{ DPF.acceptorTracer = mkVerbosity verbosity
, DPF.forwarderEndpoint = p
, DPF.shouldWeStop = stopIt
, DPF.shouldWeStop = teProtocolsBrake tracerEnv
}
)

initialPauseInSec = 1

mkVerbosity :: Show a => Maybe Verbosity -> Tracer IO a
mkVerbosity (Just Maximum) = contramap show stdoutTracer
mkVerbosity _ = nullTracer
Loading