-
Notifications
You must be signed in to change notification settings - Fork 220
/
Copy pathPools.hs
638 lines (589 loc) · 21.4 KB
/
Pools.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedLabels #-}
{-# LANGUAGE PolyKinds #-}
{-# LANGUAGE Rank2Types #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
-- |
-- Copyright: © 2020 IOHK
-- License: Apache-2.0
--
-- This module provides tools to collect a consistent view of stake pool data,
-- as provided through @StakePoolLayer@.
module Cardano.Wallet.Shelley.Pools
( StakePoolLayer (..)
, newStakePoolLayer
, monitorStakePools
, monitorMetadata
-- * Logs
, StakePoolLog (..)
)
where
import Prelude
import Cardano.BM.Data.Severity
( Severity (..) )
import Cardano.BM.Data.Tracer
( HasPrivacyAnnotation (..), HasSeverityAnnotation (..) )
import Cardano.Pool.DB
( DBLayer (..), ErrPointAlreadyExists (..), readPoolLifeCycleStatus )
import Cardano.Pool.Metadata
( StakePoolMetadataFetchLog )
import Cardano.Wallet.Api.Types
( ApiT (..) )
import Cardano.Wallet.Network
( ErrCurrentNodeTip (..)
, ErrNetworkUnavailable (..)
, FollowAction (..)
, FollowExit (..)
, FollowLog
, NetworkLayer (..)
, follow
)
import Cardano.Wallet.Primitive.Slotting
( TimeInterpreter, firstSlotInEpoch, startTime )
import Cardano.Wallet.Primitive.Types
( ActiveSlotCoefficient (..)
, BlockHeader
, CertificatePublicationTime (..)
, Coin (..)
, EpochNo (..)
, GenesisParameters (..)
, PoolCertificate (..)
, PoolId
, PoolLifeCycleStatus (..)
, PoolRegistrationCertificate (..)
, PoolRetirementCertificate (..)
, ProtocolParameters (..)
, SlotLength (..)
, SlotNo
, StakePoolMetadata
, StakePoolMetadataHash
, StakePoolMetadataUrl
, getPoolRegistrationCertificate
, getPoolRetirementCertificate
)
import Cardano.Wallet.Shelley.Compatibility
( Shelley
, getProducer
, poolCertsFromShelleyBlock
, toCardanoBlockHeader
, toShelleyBlockHeader
)
import Cardano.Wallet.Shelley.Network
( NodePoolLsqData (..) )
import Cardano.Wallet.Unsafe
( unsafeMkPercentage )
import Control.Concurrent
( threadDelay )
import Control.Monad
( forM, forM_, forever, when )
import Control.Monad.IO.Class
( liftIO )
import Control.Monad.Trans.Except
( ExceptT (..), runExceptT, withExceptT )
import Control.Monad.Trans.State
( State, evalState, state )
import Control.Tracer
( Tracer, contramap, traceWith )
import Data.Function
( (&) )
import Data.Generics.Internal.VL.Lens
( view )
import Data.List.NonEmpty
( NonEmpty (..) )
import Data.Map
( Map )
import Data.Map.Merge.Strict
( dropMissing, traverseMissing, zipWithAMatched, zipWithMatched )
import Data.Maybe
( catMaybes )
import Data.Ord
( Down (..) )
import Data.Quantity
( Percentage (..), Quantity (..) )
import Data.Set
( Set )
import Data.Text.Class
( ToText (..) )
import Data.Word
( Word64 )
import Fmt
( fixedF, pretty )
import GHC.Generics
( Generic )
import Ouroboros.Consensus.Cardano.Block
( CardanoBlock, HardForkBlock (..) )
import Ouroboros.Consensus.Shelley.Protocol
( TPraosCrypto )
import System.Random
( RandomGen, random )
import qualified Cardano.Wallet.Api.Types as Api
import qualified Data.List as L
import qualified Data.Map.Merge.Strict as Map
import qualified Data.Map.Strict as Map
--
-- Stake Pool Layer
--
data StakePoolLayer = StakePoolLayer
{ getPoolLifeCycleStatus
:: PoolId
-> IO PoolLifeCycleStatus
, knownPools
:: IO (Set PoolId)
-- | List pools based given the the amount of stake the user intends to
-- delegate, which affects the size of the rewards and the ranking of
-- the pools.
--
-- Pools with a retirement epoch earlier than or equal to the specified
-- epoch will be excluded from the result.
--
, listStakePools
:: EpochNo
-- Exclude all pools that retired in or before this epoch.
-> Coin
-> ExceptT ErrNetworkUnavailable IO [Api.ApiStakePool]
}
newStakePoolLayer
:: forall sc. ()
=> NetworkLayer IO (IO Shelley) (CardanoBlock sc)
-> DBLayer IO
-> StakePoolLayer
newStakePoolLayer nl db@DBLayer {..} =
StakePoolLayer
{ getPoolLifeCycleStatus = _getPoolLifeCycleStatus
, knownPools = _knownPools
, listStakePools = _listPools
}
where
_getPoolLifeCycleStatus
:: PoolId -> IO PoolLifeCycleStatus
_getPoolLifeCycleStatus pid =
liftIO $ atomically $ readPoolLifeCycleStatus pid
_knownPools
:: IO (Set PoolId)
_knownPools =
Map.keysSet <$> liftIO (readPoolDbData db)
_listPools
:: EpochNo
-- Exclude all pools that retired in or before this epoch.
-> Coin
-> ExceptT ErrNetworkUnavailable IO [Api.ApiStakePool]
_listPools currentEpoch userStake = do
tip <- withExceptT fromErrCurrentNodeTip $ currentNodeTip nl
rawLsqData <- stakeDistribution nl tip userStake
let lsqData = combineLsqData rawLsqData
dbData <- liftIO $ readPoolDbData db
seed <- liftIO $ atomically readSystemSeed
-- TODO:
-- Use a more efficient way of filtering out retired pools.
-- See: https://jira.iohk.io/projects/ADP/issues/ADP-383
liftIO $
sortByReward seed
. filter (not . poolIsRetired)
. map snd
. Map.toList
<$> combineDbAndLsqData
(timeInterpreter nl)
(nOpt rawLsqData)
lsqData
dbData
where
fromErrCurrentNodeTip :: ErrCurrentNodeTip -> ErrNetworkUnavailable
fromErrCurrentNodeTip = \case
ErrCurrentNodeTipNetworkUnreachable e -> e
ErrCurrentNodeTipNotFound -> ErrNetworkUnreachable "tip not found"
epochIsInFuture :: EpochNo -> Bool
epochIsInFuture = (> currentEpoch)
poolIsRetired :: Api.ApiStakePool -> Bool
poolIsRetired =
maybe False (not . epochIsInFuture) . poolRetirementEpoch
poolRetirementEpoch :: Api.ApiStakePool -> Maybe EpochNo
poolRetirementEpoch p = p
& view #retirement
& fmap (view (#epochNumber . #getApiT))
-- Sort by non-myopic member rewards, making sure to also randomly sort
-- pools that have equal rewards.
--
-- NOTE: we discard the final value of the random generator because we
-- do actually want the order to be stable between two identical
-- requests. The order simply needs to be different between different
-- instances of the server.
sortByReward
:: RandomGen g
=> g
-> [Api.ApiStakePool]
-> [Api.ApiStakePool]
sortByReward g0 =
map stakePool
. L.sortOn (Down . rewards)
. L.sortOn randomWeight
. evalState' g0
. traverse withRandomWeight
where
evalState' :: s -> State s a -> a
evalState' = flip evalState
withRandomWeight :: RandomGen g => a -> State g (Int, a)
withRandomWeight a = do
weight <- state random
pure (weight, a)
rewards = view (#metrics . #nonMyopicMemberRewards) . stakePool
(randomWeight, stakePool) = (fst, snd)
--
-- Data Combination functions
--
-- | Stake pool-related data that has been read from the node using a local
-- state query.
data PoolLsqData = PoolLsqData
{ nonMyopicMemberRewards :: Quantity "lovelace" Word64
, relativeStake :: Percentage
, saturation :: Double
} deriving (Eq, Show, Generic)
-- | Stake pool-related data that has been read from the database.
data PoolDbData = PoolDbData
{ registrationCert :: PoolRegistrationCertificate
, retirementCert :: Maybe PoolRetirementCertificate
, nProducedBlocks :: Quantity "block" Word64
, metadata :: Maybe StakePoolMetadata
}
-- | Top level combine-function that merges DB and LSQ data.
combineDbAndLsqData
:: forall m. (Monad m)
=> TimeInterpreter m
-> Int -- ^ nOpt; desired number of pools
-> Map PoolId PoolLsqData
-> Map PoolId PoolDbData
-> m (Map PoolId Api.ApiStakePool)
combineDbAndLsqData ti nOpt lsqData =
Map.mergeA lsqButNoDb dbButNoLsq bothPresent lsqData
where
bothPresent = zipWithAMatched mkApiPool
lsqButNoDb = dropMissing
-- When a pool is registered (with a registration certificate) but not
-- currently active (and therefore not causing pool metrics data to be
-- available over local state query), we use a default value of /zero/
-- for all stake pool metric values so that the pool can still be
-- included in the list of all known stake pools:
--
dbButNoLsq = traverseMissing $ \k db ->
mkApiPool k lsqDefault db
where
lsqDefault = PoolLsqData
{ nonMyopicMemberRewards = freshmanMemberRewards
, relativeStake = minBound
, saturation = 0
}
-- To give a chance to freshly registered pools that haven't been part of
-- any leader schedule, we assign them the average reward of the top @k@
-- pools.
freshmanMemberRewards
= Quantity
$ average
$ L.take nOpt
$ L.sort
$ map (Down . getQuantity . nonMyopicMemberRewards)
$ Map.elems lsqData
where
average [] = 0
average xs = round $ double (sum xs) / double (length xs)
double :: Integral a => a -> Double
double = fromIntegral
mkApiPool
:: PoolId
-> PoolLsqData
-> PoolDbData
-> m Api.ApiStakePool
mkApiPool pid (PoolLsqData prew pstk psat) dbData = do
let mRetirementEpoch = retirementEpoch <$> retirementCert dbData
retirementEpochInfo <- traverse toApiEpochInfo mRetirementEpoch
pure $ Api.ApiStakePool
{ Api.id = (ApiT pid)
, Api.metrics = Api.ApiStakePoolMetrics
{ Api.nonMyopicMemberRewards = fmap fromIntegral prew
, Api.relativeStake = Quantity pstk
, Api.saturation = psat
, Api.producedBlocks =
(fmap fromIntegral . nProducedBlocks) dbData
}
, Api.metadata =
ApiT <$> metadata dbData
, Api.cost =
fmap fromIntegral $ poolCost $ registrationCert dbData
, Api.pledge =
fmap fromIntegral $ poolPledge $ registrationCert dbData
, Api.margin =
Quantity $ poolMargin $ registrationCert dbData
, Api.retirement = retirementEpochInfo
}
toApiEpochInfo ep = do
time <- ti $ startTime =<< firstSlotInEpoch ep
return $ Api.ApiEpochInfo (ApiT ep) time
-- | Combines all the LSQ data into a single map.
--
-- This is the data we can ask the node for the most recent version of, over the
-- local state query protocol.
--
-- Calculating e.g. the nonMyopicMemberRewards ourselves through chain-following
-- would be completely impractical.
combineLsqData
:: NodePoolLsqData
-> Map PoolId PoolLsqData
combineLsqData NodePoolLsqData{nOpt, rewards, stake} =
Map.merge stakeButNoRewards rewardsButNoStake bothPresent stake rewards
where
-- calculate the saturation from the relative stake
sat s = fromRational $ (getPercentage s) / (1 / fromIntegral nOpt)
-- If we fetch non-myopic member rewards of pools using the wallet
-- balance of 0, the resulting map will be empty. So we set the rewards
-- to 0 here:
stakeButNoRewards = traverseMissing $ \_k s -> pure $ PoolLsqData
{ nonMyopicMemberRewards = Quantity 0
, relativeStake = s
, saturation = (sat s)
}
-- TODO: This case seems possible on shelley_testnet, but why, and how
-- should we treat it?
--
-- The pool with rewards but not stake didn't seem to be retiring.
rewardsButNoStake = traverseMissing $ \_k r -> pure $ PoolLsqData
{ nonMyopicMemberRewards = r
, relativeStake = noStake
, saturation = sat noStake
}
where
noStake = unsafeMkPercentage 0
bothPresent = zipWithMatched $ \_k s r -> PoolLsqData r s (sat s)
-- | Combines all the chain-following data into a single map
combineChainData
:: Map PoolId PoolRegistrationCertificate
-> Map PoolId PoolRetirementCertificate
-> Map PoolId (Quantity "block" Word64)
-> Map StakePoolMetadataHash StakePoolMetadata
-> Map PoolId PoolDbData
combineChainData registrationMap retirementMap prodMap metaMap =
Map.map mkPoolDbData $
Map.merge
registeredNoProductions
notRegisteredButProducing
bothPresent
registrationMap
prodMap
where
registeredNoProductions = traverseMissing $ \_k cert ->
pure (cert, Quantity 0)
-- Ignore blocks produced by BFT nodes.
notRegisteredButProducing = dropMissing
bothPresent = zipWithMatched $ const (,)
mkPoolDbData
:: (PoolRegistrationCertificate, Quantity "block" Word64)
-> PoolDbData
mkPoolDbData (registrationCert, n) =
PoolDbData registrationCert mRetirementCert n meta
where
metaHash = snd <$> poolMetadata registrationCert
meta = flip Map.lookup metaMap =<< metaHash
mRetirementCert =
Map.lookup (view #poolId registrationCert) retirementMap
-- TODO:
--
-- This function currently executes a total of (2n + 1) database queries, where
-- n is the total number of pools with entries in the pool registrations table.
--
-- Specifically:
--
-- 1. We first execute a query to determine the complete set of all pools
-- (including those that may have retired).
--
-- 2. For each pool, we determine its current life-cycle status by executing
-- a pair of queries to fetch:
--
-- a. The most recent registration certificate.
-- b. The most recent retirement certificate.
--
-- This is almost certainly not optimal.
--
-- If performance becomes a problem, we should investigate ways to reduce the
-- number of queries required:
--
-- See: https://jira.iohk.io/browse/ADP-383
--
-- Additionally, we can consider performing garbage collection of retired pools
-- from the database:
--
-- See: https://jira.iohk.io/browse/ADP-376
--
readPoolDbData :: DBLayer IO -> IO (Map PoolId PoolDbData)
readPoolDbData DBLayer {..} = atomically $ do
pools <- listRegisteredPools
lifeCycleStatuses <- mapM readPoolLifeCycleStatus pools
let mkCertificateMap
:: forall a . (PoolLifeCycleStatus -> Maybe a) -> Map PoolId a
mkCertificateMap f = Map.fromList
[(p, c) | (p, Just c) <- zip pools (f <$> lifeCycleStatuses)]
combineChainData
(mkCertificateMap getPoolRegistrationCertificate)
(mkCertificateMap getPoolRetirementCertificate)
<$> readTotalProduction
<*> readPoolMetadata
--
-- Monitoring stake pool
--
monitorStakePools
:: forall t sc. (TPraosCrypto sc)
=> Tracer IO StakePoolLog
-> GenesisParameters
-> NetworkLayer IO t (CardanoBlock sc)
-> DBLayer IO
-> IO ()
monitorStakePools tr gp nl db@DBLayer{..} = do
cursor <- initCursor
traceWith tr $ MsgStartMonitoring cursor
follow nl (contramap MsgFollow tr) cursor forward getHeader >>= \case
FollowInterrupted -> traceWith tr MsgHaltMonitoring
FollowFailure -> traceWith tr MsgCrashMonitoring
FollowRollback point -> do
traceWith tr $ MsgRollingBackTo point
liftIO . atomically $ rollbackTo point
monitorStakePools tr gp nl db
where
GenesisParameters
{ getGenesisBlockHash
, getEpochStability
} = gp
initCursor :: IO [BlockHeader]
initCursor = atomically $ readPoolProductionCursor (max 100 k)
where k = fromIntegral $ getQuantity getEpochStability
getHeader :: CardanoBlock sc -> BlockHeader
getHeader = toCardanoBlockHeader gp
forward
:: NonEmpty (CardanoBlock sc)
-> (BlockHeader, ProtocolParameters)
-> IO (FollowAction ())
forward blocks (_nodeTip, _pparams) = do
atomically $ forM_ blocks $ \case
BlockByron _ -> pure ()
BlockShelley blk -> do
let (slot, certificates) = poolCertsFromShelleyBlock blk
let header = toShelleyBlockHeader getGenesisBlockHash blk
runExceptT (putPoolProduction header (getProducer blk))
>>= \case
Left e ->
liftIO $ traceWith tr $ MsgErrProduction e
Right () ->
pure ()
-- A single block can contain multiple certificates relating to the
-- same pool.
--
-- The /order/ in which certificates appear is /significant/:
-- certificates that appear later in a block /generally/ take
-- precedence over certificates that appear earlier on.
--
-- We record /all/ certificates within the database, together with
-- the order in which they appeared.
--
-- Precedence is determined by the 'readPoolLifeCycleStatus'
-- function.
--
let publicationTimes =
CertificatePublicationTime slot <$> [minBound ..]
forM_ (publicationTimes `zip` certificates) $ \case
(publicationTime, Registration cert) -> do
liftIO $ traceWith tr $ MsgStakePoolRegistration cert
putPoolRegistration publicationTime cert
(publicationTime, Retirement cert) -> do
liftIO $ traceWith tr $ MsgStakePoolRetirement cert
putPoolRetirement publicationTime cert
pure Continue
monitorMetadata
:: Tracer IO StakePoolLog
-> GenesisParameters
-> ( PoolId
-> StakePoolMetadataUrl
-> StakePoolMetadataHash
-> IO (Maybe StakePoolMetadata)
)
-> DBLayer IO
-> IO ()
monitorMetadata tr gp fetchMetadata DBLayer{..} = forever $ do
refs <- atomically (unfetchedPoolMetadataRefs 100)
successes <- fmap catMaybes $ forM refs $ \(pid, url, hash) -> do
fetchMetadata pid url hash >>= \case
Nothing -> Nothing <$ do
atomically $ putFetchAttempt (url, hash)
Just meta -> Just hash <$ do
atomically $ putPoolMetadata hash meta
when (null refs || null successes) $ do
traceWith tr $ MsgFetchTakeBreak blockFrequency
threadDelay blockFrequency
where
-- NOTE
-- If there's no metadata, we typically need not to retry sooner than the
-- next block. So waiting for a delay that is roughly the same order of
-- magnitude as the (slot length / active slot coeff) sounds sound.
blockFrequency = ceiling (1/f) * toMicroSecond slotLength
where
toMicroSecond = (`div` 1000000) . fromEnum
slotLength = unSlotLength $ getSlotLength gp
f = unActiveSlotCoefficient (getActiveSlotCoefficient gp)
data StakePoolLog
= MsgFollow FollowLog
| MsgStartMonitoring [BlockHeader]
| MsgHaltMonitoring
| MsgCrashMonitoring
| MsgRollingBackTo SlotNo
| MsgStakePoolRegistration PoolRegistrationCertificate
| MsgStakePoolRetirement PoolRetirementCertificate
| MsgErrProduction ErrPointAlreadyExists
| MsgFetchPoolMetadata StakePoolMetadataFetchLog
| MsgFetchTakeBreak Int
deriving (Show, Eq)
instance HasPrivacyAnnotation StakePoolLog
instance HasSeverityAnnotation StakePoolLog where
getSeverityAnnotation = \case
MsgFollow e -> getSeverityAnnotation e
MsgStartMonitoring{} -> Info
MsgHaltMonitoring{} -> Info
MsgCrashMonitoring{} -> Error
MsgRollingBackTo{} -> Info
MsgStakePoolRegistration{} -> Info
MsgStakePoolRetirement{} -> Info
MsgErrProduction{} -> Error
MsgFetchPoolMetadata e -> getSeverityAnnotation e
MsgFetchTakeBreak{} -> Debug
instance ToText StakePoolLog where
toText = \case
MsgFollow e ->
toText e
MsgStartMonitoring cursor -> mconcat
[ "Monitoring stake pools. Currently at "
, case cursor of
[] -> "genesis"
_ -> pretty (last cursor)
]
MsgHaltMonitoring ->
"Stopping stake pool monitoring as requested."
MsgCrashMonitoring -> mconcat
[ "Chain follower exited with error. "
, "Worker will no longer monitor stake pools."
]
MsgRollingBackTo point ->
"Rolling back to " <> pretty point
MsgStakePoolRegistration cert ->
"Discovered stake pool registration: " <> pretty cert
MsgStakePoolRetirement cert ->
"Discovered stake pool retirement: " <> pretty cert
MsgErrProduction (ErrPointAlreadyExists blk) -> mconcat
[ "Couldn't store production for given block before it conflicts "
, "with another block. Conflicting block header is: ", pretty blk
]
MsgFetchPoolMetadata e ->
toText e
MsgFetchTakeBreak delay -> mconcat
[ "Taking a little break from fetching metadata, "
, "back to it in about "
, pretty (fixedF 1 (toRational delay / 1000000)), "s"
]