Skip to content

Commit f88b3ae

Browse files
committed
Register and wire-in PerasCertDiffusion in the network layer
1 parent 626d283 commit f88b3ae

File tree

5 files changed

+161
-30
lines changed

5 files changed

+161
-30
lines changed

ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs

Lines changed: 141 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ import Ouroboros.Consensus.MiniProtocol.ChainSync.Client
6868
)
6969
import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client as CsClient
7070
import Ouroboros.Consensus.MiniProtocol.ChainSync.Server
71+
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound (objectDiffusionInbound)
72+
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.PerasCert
73+
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Outbound (objectDiffusionOutbound)
74+
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.PerasCert
7175
import Ouroboros.Consensus.Node.ExitPolicy
7276
import Ouroboros.Consensus.Node.NetworkProtocolVersion
7377
import Ouroboros.Consensus.Node.Run
@@ -81,10 +85,6 @@ import Ouroboros.Consensus.Util.IOLike
8185
import Ouroboros.Consensus.Util.Orphans ()
8286
import Ouroboros.Network.Block
8387
( Serialised (..)
84-
, decodePoint
85-
, decodeTip
86-
, encodePoint
87-
, encodeTip
8888
)
8989
import Ouroboros.Network.BlockFetch
9090
import Ouroboros.Network.BlockFetch.Client
@@ -124,6 +124,18 @@ import Ouroboros.Network.Protocol.KeepAlive.Client
124124
import Ouroboros.Network.Protocol.KeepAlive.Codec
125125
import Ouroboros.Network.Protocol.KeepAlive.Server
126126
import Ouroboros.Network.Protocol.KeepAlive.Type
127+
import Ouroboros.Network.Protocol.ObjectDiffusion.Codec
128+
( byteLimitsObjectDiffusion
129+
, codecObjectDiffusion
130+
, codecObjectDiffusionId
131+
, timeLimitsObjectDiffusion
132+
)
133+
import Ouroboros.Network.Protocol.ObjectDiffusion.Inbound
134+
( objectDiffusionInboundPeerPipelined
135+
)
136+
import Ouroboros.Network.Protocol.ObjectDiffusion.Outbound
137+
( objectDiffusionOutboundPeer
138+
)
127139
import Ouroboros.Network.Protocol.PeerSharing.Client
128140
( PeerSharingClient
129141
, peerSharingClientPeer
@@ -197,6 +209,15 @@ data Handlers m addr blk = Handlers
197209
NodeToNodeVersion ->
198210
ConnectionId addr ->
199211
TxSubmissionServerPipelined (GenTxId blk) (GenTx blk) m ()
212+
, hPerasCertDiffusionClient ::
213+
NodeToNodeVersion ->
214+
ControlMessageSTM m ->
215+
ConnectionId addr ->
216+
PerasCertDiffusionInboundPipelined blk m ()
217+
, hPerasCertDiffusionServer ::
218+
NodeToNodeVersion ->
219+
ConnectionId addr ->
220+
PerasCertDiffusionOutbound blk m ()
200221
, hKeepAliveClient ::
201222
NodeToNodeVersion ->
202223
ControlMessageSTM m ->
@@ -293,6 +314,22 @@ mkHandlers
293314
(mapTxSubmissionMempoolReader txForgetValidated $ getMempoolReader getMempool)
294315
(getMempoolWriter getMempool)
295316
version
317+
, hPerasCertDiffusionClient = \version controlMessageSTM peer ->
318+
objectDiffusionInbound
319+
(contramap (TraceLabelPeer peer) (Node.perasCertDiffusionInboundTracer tracers))
320+
( perasCertDiffusionMaxFifoLength miniProtocolParameters
321+
, 10 -- TODO https://github.com/tweag/cardano-peras/issues/97
322+
, 10 -- TODO https://github.com/tweag/cardano-peras/issues/97
323+
)
324+
(makePerasCertPoolWriterFromChainDB $ getChainDB)
325+
version
326+
controlMessageSTM
327+
, hPerasCertDiffusionServer = \version peer ->
328+
objectDiffusionOutbound
329+
(contramap (TraceLabelPeer peer) (Node.perasCertDiffusionOutboundTracer tracers))
330+
(perasCertDiffusionMaxFifoLength miniProtocolParameters)
331+
(makePerasCertPoolReaderFromChainDB $ getChainDB)
332+
version
296333
, hKeepAliveClient = \_version -> keepAliveClient (Node.keepAliveClientTracer tracers) keepAliveRng
297334
, hKeepAliveServer = \_version _peer -> keepAliveServer
298335
, hPeerSharingClient = \_version controlMessageSTM _peer -> peerSharingClient controlMessageSTM
@@ -304,14 +341,15 @@ mkHandlers
304341
-------------------------------------------------------------------------------}
305342

306343
-- | Node-to-node protocol codecs needed to run 'Handlers'.
307-
data Codecs blk addr e m bCS bSCS bBF bSBF bTX bKA bPS = Codecs
344+
data Codecs blk addr e m bCS bSCS bBF bSBF bTX bPCD bKA bPS = Codecs
308345
{ cChainSyncCodec :: Codec (ChainSync (Header blk) (Point blk) (Tip blk)) e m bCS
309346
, cChainSyncCodecSerialised ::
310347
Codec (ChainSync (SerialisedHeader blk) (Point blk) (Tip blk)) e m bSCS
311348
, cBlockFetchCodec :: Codec (BlockFetch blk (Point blk)) e m bBF
312349
, cBlockFetchCodecSerialised ::
313350
Codec (BlockFetch (Serialised blk) (Point blk)) e m bSBF
314351
, cTxSubmission2Codec :: Codec (TxSubmission2 (GenTxId blk) (GenTx blk)) e m bTX
352+
, cPerasCertDiffusionCodec :: Codec (PerasCertDiffusion blk) e m bPCD
315353
, cKeepAliveCodec :: Codec KeepAlive e m bKA
316354
, cPeerSharingCodec :: Codec (PeerSharing addr) e m bPS
317355
}
@@ -339,49 +377,53 @@ defaultCodecs ::
339377
ByteString
340378
ByteString
341379
ByteString
380+
ByteString
342381
defaultCodecs ccfg version encAddr decAddr nodeToNodeVersion =
343382
Codecs
344383
{ cChainSyncCodec =
345384
codecChainSync
346385
enc
347386
dec
348-
(encodePoint (encodeRawHash p))
349-
(decodePoint (decodeRawHash p))
350-
(encodeTip (encodeRawHash p))
351-
(decodeTip (decodeRawHash p))
387+
enc
388+
dec
389+
enc
390+
dec
352391
, cChainSyncCodecSerialised =
353392
codecChainSync
354393
enc
355394
dec
356-
(encodePoint (encodeRawHash p))
357-
(decodePoint (decodeRawHash p))
358-
(encodeTip (encodeRawHash p))
359-
(decodeTip (decodeRawHash p))
395+
enc
396+
dec
397+
enc
398+
dec
360399
, cBlockFetchCodec =
361400
codecBlockFetch
362401
enc
363402
dec
364-
(encodePoint (encodeRawHash p))
365-
(decodePoint (decodeRawHash p))
403+
enc
404+
dec
366405
, cBlockFetchCodecSerialised =
367406
codecBlockFetch
368407
enc
369408
dec
370-
(encodePoint (encodeRawHash p))
371-
(decodePoint (decodeRawHash p))
409+
enc
410+
dec
372411
, cTxSubmission2Codec =
373412
codecTxSubmission2
374413
enc
375414
dec
376415
enc
377416
dec
417+
, cPerasCertDiffusionCodec =
418+
codecObjectDiffusion
419+
enc
420+
dec
421+
enc
422+
dec
378423
, cKeepAliveCodec = codecKeepAlive_v2
379424
, cPeerSharingCodec = codecPeerSharing (encAddr nodeToNodeVersion) (decAddr nodeToNodeVersion)
380425
}
381426
where
382-
p :: Proxy blk
383-
p = Proxy
384-
385427
enc :: SerialiseNodeToNode blk a => a -> Encoding
386428
enc = encodeNodeToNode ccfg version
387429

@@ -401,6 +443,7 @@ identityCodecs ::
401443
(AnyMessage (BlockFetch blk (Point blk)))
402444
(AnyMessage (BlockFetch (Serialised blk) (Point blk)))
403445
(AnyMessage (TxSubmission2 (GenTxId blk) (GenTx blk)))
446+
(AnyMessage (PerasCertDiffusion blk))
404447
(AnyMessage KeepAlive)
405448
(AnyMessage (PeerSharing addr))
406449
identityCodecs =
@@ -410,6 +453,7 @@ identityCodecs =
410453
, cBlockFetchCodec = codecBlockFetchId
411454
, cBlockFetchCodecSerialised = codecBlockFetchId
412455
, cTxSubmission2Codec = codecTxSubmission2Id
456+
, cPerasCertDiffusionCodec = codecObjectDiffusionId
413457
, cKeepAliveCodec = codecKeepAliveId
414458
, cPeerSharingCodec = codecPeerSharingId
415459
}
@@ -432,6 +476,7 @@ data Tracers' peer ntnAddr blk e f = Tracers
432476
f (TraceLabelPeer peer (TraceSendRecv (BlockFetch (Serialised blk) (Point blk))))
433477
, tTxSubmission2Tracer ::
434478
f (TraceLabelPeer peer (TraceSendRecv (TxSubmission2 (GenTxId blk) (GenTx blk))))
479+
, tPerasCertDiffusionTracer :: f (TraceLabelPeer peer (TraceSendRecv (PerasCertDiffusion blk)))
435480
, tKeepAliveTracer :: f (TraceLabelPeer peer (TraceSendRecv KeepAlive))
436481
, tPeerSharingTracer :: f (TraceLabelPeer peer (TraceSendRecv (PeerSharing ntnAddr)))
437482
}
@@ -444,6 +489,7 @@ instance (forall a. Semigroup (f a)) => Semigroup (Tracers' peer ntnAddr blk e f
444489
, tBlockFetchTracer = f tBlockFetchTracer
445490
, tBlockFetchSerialisedTracer = f tBlockFetchSerialisedTracer
446491
, tTxSubmission2Tracer = f tTxSubmission2Tracer
492+
, tPerasCertDiffusionTracer = f tPerasCertDiffusionTracer
447493
, tKeepAliveTracer = f tKeepAliveTracer
448494
, tPeerSharingTracer = f tPeerSharingTracer
449495
}
@@ -464,6 +510,7 @@ nullTracers =
464510
, tBlockFetchTracer = nullTracer
465511
, tBlockFetchSerialisedTracer = nullTracer
466512
, tTxSubmission2Tracer = nullTracer
513+
, tPerasCertDiffusionTracer = nullTracer
467514
, tKeepAliveTracer = nullTracer
468515
, tPeerSharingTracer = nullTracer
469516
}
@@ -485,6 +532,7 @@ showTracers tr =
485532
, tBlockFetchTracer = showTracing tr
486533
, tBlockFetchSerialisedTracer = showTracing tr
487534
, tTxSubmission2Tracer = showTracing tr
535+
, tPerasCertDiffusionTracer = showTracing tr
488536
, tKeepAliveTracer = showTracing tr
489537
, tPeerSharingTracer = showTracing tr
490538
}
@@ -509,7 +557,7 @@ type ServerApp m addr bytes a =
509557
-- | Applications for the node-to-node protocols
510558
--
511559
-- See 'Network.Mux.Types.MuxApplication'
512-
data Apps m addr bCS bBF bTX bKA bPS a b = Apps
560+
data Apps m addr bCS bBF bTX bPCD bKA bPS a b = Apps
513561
{ aChainSyncClient :: ClientApp m addr bCS a
514562
-- ^ Start a chain sync client that communicates with the given upstream
515563
-- node.
@@ -525,6 +573,10 @@ data Apps m addr bCS bBF bTX bKA bPS a b = Apps
525573
-- given upstream node.
526574
, aTxSubmission2Server :: ServerApp m addr bTX b
527575
-- ^ Start a transaction submission v2 server.
576+
, aPerasCertDiffusionClient :: ClientApp m addr bPCD a
577+
-- ^ Start a Peras cert diffusion client.
578+
, aPerasCertDiffusionServer :: ServerApp m addr bPCD b
579+
-- ^ Start a Peras cert diffusion server.
528580
, aKeepAliveClient :: ClientApp m addr bKA a
529581
-- ^ Start a keep-alive client.
530582
, aKeepAliveServer :: ServerApp m addr bKA b
@@ -540,7 +592,7 @@ data Apps m addr bCS bBF bTX bKA bPS a b = Apps
540592
--
541593
-- They don't depend on the instantiation of the protocol parameters (which
542594
-- block type is used, etc.), hence the use of 'RankNTypes'.
543-
data ByteLimits bCS bBF bTX bKA = ByteLimits
595+
data ByteLimits bCS bBF bTX bPCD bKA = ByteLimits
544596
{ blChainSync ::
545597
forall header point tip.
546598
ProtocolSizeLimits
@@ -556,27 +608,34 @@ data ByteLimits bCS bBF bTX bKA = ByteLimits
556608
ProtocolSizeLimits
557609
(TxSubmission2 txid tx)
558610
bTX
611+
, blPerasCertDiffusion ::
612+
forall blk.
613+
ProtocolSizeLimits
614+
(PerasCertDiffusion blk)
615+
bPCD
559616
, blKeepAlive ::
560617
ProtocolSizeLimits
561618
KeepAlive
562619
bKA
563620
}
564621

565-
noByteLimits :: ByteLimits bCS bBF bTX bKA
622+
noByteLimits :: ByteLimits bCS bBF bTX bPCD bKA
566623
noByteLimits =
567624
ByteLimits
568625
{ blChainSync = byteLimitsChainSync (const 0)
569626
, blBlockFetch = byteLimitsBlockFetch (const 0)
570627
, blTxSubmission2 = byteLimitsTxSubmission2 (const 0)
628+
, blPerasCertDiffusion = byteLimitsObjectDiffusion (const 0)
571629
, blKeepAlive = byteLimitsKeepAlive (const 0)
572630
}
573631

574-
byteLimits :: ByteLimits ByteString ByteString ByteString ByteString
632+
byteLimits :: ByteLimits ByteString ByteString ByteString ByteString ByteString
575633
byteLimits =
576634
ByteLimits
577635
{ blChainSync = byteLimitsChainSync size
578636
, blBlockFetch = byteLimitsBlockFetch size
579637
, blTxSubmission2 = byteLimitsTxSubmission2 size
638+
, blPerasCertDiffusion = byteLimitsObjectDiffusion size
580639
, blKeepAlive = byteLimitsKeepAlive size
581640
}
582641
where
@@ -587,7 +646,7 @@ byteLimits =
587646

588647
-- | Construct the 'NetworkApplication' for the node-to-node protocols
589648
mkApps ::
590-
forall m addrNTN addrNTC blk e bCS bBF bTX bKA bPS.
649+
forall m addrNTN addrNTC blk e bCS bBF bTX bPCD bKA bPS.
591650
( IOLike m
592651
, MonadTimer m
593652
, Ord addrNTN
@@ -602,16 +661,16 @@ mkApps ::
602661
NodeKernel m addrNTN addrNTC blk ->
603662
StdGen ->
604663
Tracers m addrNTN blk e ->
605-
(NodeToNodeVersion -> Codecs blk addrNTN e m bCS bCS bBF bBF bTX bKA bPS) ->
606-
ByteLimits bCS bBF bTX bKA ->
664+
(NodeToNodeVersion -> Codecs blk addrNTN e m bCS bCS bBF bBF bTX bPCD bKA bPS) ->
665+
ByteLimits bCS bBF bTX bPCD bKA ->
607666
-- Chain-Sync timeouts for chain-sync client (using `Header blk`) as well as
608667
-- the server (`SerialisedHeader blk`).
609668
(forall header. ProtocolTimeLimitsWithRnd (ChainSync header (Point blk) (Tip blk))) ->
610669
CsClient.ChainSyncLoPBucketConfig ->
611670
CsClient.CSJConfig ->
612671
ReportPeerMetrics m (ConnectionId addrNTN) ->
613672
Handlers m addrNTN blk ->
614-
Apps m addrNTN bCS bBF bTX bKA bPS NodeToNodeInitiatorResult ()
673+
Apps m addrNTN bCS bBF bTX bPCD bKA bPS NodeToNodeInitiatorResult ()
615674
mkApps kernel rng Tracers{..} mkCodecs ByteLimits{..} chainSyncTimeouts lopBucketConfig csjConfig ReportPeerMetrics{..} Handlers{..} =
616675
Apps{..}
617676
where
@@ -790,6 +849,51 @@ mkApps kernel rng Tracers{..} mkCodecs ByteLimits{..} chainSyncTimeouts lopBucke
790849
channel
791850
(txSubmissionServerPeerPipelined (hTxSubmissionServer version them))
792851

852+
aPerasCertDiffusionClient ::
853+
NodeToNodeVersion ->
854+
ExpandedInitiatorContext addrNTN m ->
855+
Channel m bPCD ->
856+
m (NodeToNodeInitiatorResult, Maybe bPCD)
857+
aPerasCertDiffusionClient
858+
version
859+
ExpandedInitiatorContext
860+
{ eicConnectionId = them
861+
, eicControlMessage = controlMessageSTM
862+
}
863+
channel = do
864+
labelThisThread "PerasCertDiffusionClient"
865+
((), trailing) <-
866+
runPipelinedPeerWithLimits
867+
(TraceLabelPeer them `contramap` tPerasCertDiffusionTracer)
868+
(cPerasCertDiffusionCodec (mkCodecs version))
869+
blPerasCertDiffusion
870+
timeLimitsObjectDiffusion
871+
channel
872+
( objectDiffusionInboundPeerPipelined
873+
(hPerasCertDiffusionClient version controlMessageSTM them)
874+
)
875+
return (NoInitiatorResult, trailing)
876+
877+
aPerasCertDiffusionServer ::
878+
NodeToNodeVersion ->
879+
ResponderContext addrNTN ->
880+
Channel m bPCD ->
881+
m ((), Maybe bPCD)
882+
aPerasCertDiffusionServer
883+
version
884+
ResponderContext{rcConnectionId = them}
885+
channel = do
886+
labelThisThread "PerasCertDiffusionServer"
887+
runPeerWithLimits
888+
(TraceLabelPeer them `contramap` tPerasCertDiffusionTracer)
889+
(cPerasCertDiffusionCodec (mkCodecs version))
890+
blPerasCertDiffusion
891+
timeLimitsObjectDiffusion
892+
channel
893+
( objectDiffusionOutboundPeer
894+
(hPerasCertDiffusionServer version them)
895+
)
896+
793897
aKeepAliveClient ::
794898
NodeToNodeVersion ->
795899
ExpandedInitiatorContext addrNTN m ->
@@ -893,7 +997,7 @@ initiator ::
893997
MiniProtocolParameters ->
894998
NodeToNodeVersion ->
895999
NodeToNodeVersionData ->
896-
Apps m addr b b b b b a c ->
1000+
Apps m addr b b b b b b a c ->
8971001
OuroborosBundleWithExpandedCtx 'Mux.InitiatorMode addr b m a Void
8981002
initiator miniProtocolParameters version versionData Apps{..} =
8991003
nodeToNodeProtocols
@@ -911,6 +1015,8 @@ initiator miniProtocolParameters version versionData Apps{..} =
9111015
(InitiatorProtocolOnly (MiniProtocolCb (\ctx -> aBlockFetchClient version ctx)))
9121016
, txSubmissionProtocol =
9131017
(InitiatorProtocolOnly (MiniProtocolCb (\ctx -> aTxSubmission2Client version ctx)))
1018+
, perasCertDiffusionProtocol =
1019+
(InitiatorProtocolOnly (MiniProtocolCb (\ctx -> aPerasCertDiffusionClient version ctx)))
9141020
, keepAliveProtocol =
9151021
(InitiatorProtocolOnly (MiniProtocolCb (\ctx -> aKeepAliveClient version ctx)))
9161022
, peerSharingProtocol =
@@ -929,7 +1035,7 @@ initiatorAndResponder ::
9291035
MiniProtocolParameters ->
9301036
NodeToNodeVersion ->
9311037
NodeToNodeVersionData ->
932-
Apps m addr b b b b b a c ->
1038+
Apps m addr b b b b b b a c ->
9331039
OuroborosBundleWithExpandedCtx 'Mux.InitiatorResponderMode addr b m a c
9341040
initiatorAndResponder miniProtocolParameters version versionData Apps{..} =
9351041
nodeToNodeProtocols
@@ -950,6 +1056,11 @@ initiatorAndResponder miniProtocolParameters version versionData Apps{..} =
9501056
(MiniProtocolCb (\initiatorCtx -> aTxSubmission2Client version initiatorCtx))
9511057
(MiniProtocolCb (\responderCtx -> aTxSubmission2Server version responderCtx))
9521058
)
1059+
, perasCertDiffusionProtocol =
1060+
( InitiatorAndResponderProtocol
1061+
(MiniProtocolCb (\initiatorCtx -> aPerasCertDiffusionClient version initiatorCtx))
1062+
(MiniProtocolCb (\responderCtx -> aPerasCertDiffusionServer version responderCtx))
1063+
)
9531064
, keepAliveProtocol =
9541065
( InitiatorAndResponderProtocol
9551066
(MiniProtocolCb (\initiatorCtx -> aKeepAliveClient version initiatorCtx))

0 commit comments

Comments
 (0)