-
Notifications
You must be signed in to change notification settings - Fork 149
/
Copy paththrottle.go
491 lines (423 loc) · 19 KB
/
throttle.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
package keeper
import (
"encoding/binary"
"fmt"
"time"
sdktypes "github.com/cosmos/cosmos-sdk/types"
providertypes "github.com/cosmos/interchain-security/x/ccv/provider/types"
ccvtypes "github.com/cosmos/interchain-security/x/ccv/types"
tmtypes "github.com/tendermint/tendermint/types"
)
// This file contains functionality relevant to the throttling of slash and vsc matured packets, aka circuit breaker logic.
// HandleThrottleQueues iterates over the global slash entry queue, and
// handles all or some portion of throttled (slash and/or VSC matured) packet data received from
// consumer chains. The slash meter is decremented appropriately in this method.
func (k Keeper) HandleThrottleQueues(ctx sdktypes.Context) {
meter := k.GetSlashMeter(ctx)
// Don't start iterating if meter is negative in value
if meter.IsNegative() {
return
}
handledGlobalEntries := []providertypes.GlobalSlashEntry{}
// Iterate through ordered (by received time) global slash entries from any consumer chain
k.IterateGlobalSlashEntries(ctx, func(entry providertypes.GlobalSlashEntry) (stop bool) {
// Obtain validator from the provider's consensus address.
// Note: if validator is not found or unbonded, this will be handled appropriately in HandleSlashPacket
val, found := k.stakingKeeper.GetValidatorByConsAddr(ctx, entry.ProviderValConsAddr)
// Obtain the validator power relevant to the slash packet that's about to be handled
// (this power will be removed via jailing or tombstoning)
var valPower int64
if !found || val.IsJailed() {
// If validator is not found, or found but jailed, it's power is 0. This path is explicitly defined since the
// staking keeper's LastValidatorPower values are not updated till the staking keeper's endblocker.
valPower = 0
} else {
valPower = k.stakingKeeper.GetLastValidatorPower(ctx, val.GetOperator())
}
// Subtract this power from the slash meter
meter = meter.Sub(sdktypes.NewInt(valPower))
// Handle slash and any trailing vsc matured packet data instances by passing in
// chainID and appropriate callbacks, relevant packet data is deleted in this method.
k.HandlePacketDataForChain(ctx, entry.ConsumerChainID, k.HandleSlashPacket, k.HandleVSCMaturedPacket)
// Store handled global entry to be deleted after iteration is completed
handledGlobalEntries = append(handledGlobalEntries, entry)
// Do not handle anymore global slash entries if the meter is negative in value
stop = meter.IsNegative()
return stop
})
// Handled global entries are deleted after iteration is completed
k.DeleteGlobalSlashEntries(ctx, handledGlobalEntries...)
// Persist current value for slash meter
k.SetSlashMeter(ctx, meter)
}
// HandlePacketDataForChain handles only the first queued slash packet relevant to the passed consumer chainID,
// and then handles any trailing vsc matured packets in that (consumer chain specific) throttled packet data queue.
//
// Note: Any packet data which is handled in this method is also deleted from the (consumer chain specific) queue.
func (k Keeper) HandlePacketDataForChain(ctx sdktypes.Context, consumerChainID string,
slashPacketHandler func(sdktypes.Context, string, ccvtypes.SlashPacketData),
vscMaturedPacketHandler func(sdktypes.Context, string, ccvtypes.VSCMaturedPacketData),
) {
// Store ibc sequence numbers to delete data after iteration is completed
seqNums := []uint64{}
slashPacketHandled := false
k.IterateThrottledPacketData(ctx, consumerChainID, func(ibcSeqNum uint64, data interface{}) (stop bool) {
switch data := data.(type) {
case ccvtypes.SlashPacketData:
if slashPacketHandled {
// Break iteration, since we've already handled one slash packet
stop = true
return stop
} else {
// Handle slash packet and set flag to true
slashPacketHandler(ctx, consumerChainID, data)
slashPacketHandled = true
}
case ccvtypes.VSCMaturedPacketData:
vscMaturedPacketHandler(ctx, consumerChainID, data)
default:
panic(fmt.Sprintf("unexpected pending packet data type: %T", data))
}
seqNums = append(seqNums, ibcSeqNum)
// Continue iterating through the queue until we reach the end or a 2nd slash packet
stop = false
return stop
})
// Delete handled data after iteration is completed
k.DeleteThrottledPacketData(ctx, consumerChainID, seqNums...)
}
// InitializeSlashMeter initializes the slash meter to it's max value (also its allowance),
// and sets the last replenish time to the current block time.
func (k Keeper) InitializeSlashMeter(ctx sdktypes.Context) {
k.SetSlashMeter(ctx, k.GetSlashMeterAllowance(ctx))
k.SetLastSlashMeterFullTime(ctx, ctx.BlockTime())
}
// CheckForSlashMeterReplenishment checks if the slash meter should be replenished, and if so, replenishes it.
// Note: initial "last slash meter full time" is set in InitGenesis.
func (k Keeper) CheckForSlashMeterReplenishment(ctx sdktypes.Context) {
lastFullTime := k.GetLastSlashMeterFullTime(ctx)
replenishPeriod := k.GetSlashMeterReplenishPeriod(ctx)
// Replenish slash meter if enough time has passed since the last time it was full.
if ctx.BlockTime().UTC().After(lastFullTime.Add(replenishPeriod)) {
k.ReplenishSlashMeter(ctx)
}
// If slash meter is full, or more than full considering updated allowance/total power,
allowance := k.GetSlashMeterAllowance(ctx)
if k.GetSlashMeter(ctx).GTE(allowance) {
// set the most recent time the slash meter was full to current block time.
k.SetLastSlashMeterFullTime(ctx, ctx.BlockTime())
// Ensure the slash meter is not greater than allowance,
// considering current total voting power.
k.SetSlashMeter(ctx, allowance)
}
}
func (k Keeper) ReplenishSlashMeter(ctx sdktypes.Context) {
meter := k.GetSlashMeter(ctx)
allowance := k.GetSlashMeterAllowance(ctx)
// Replenish meter up to allowance for this block. That is, if meter was negative
// before being replenished, it'll become more positive in value. However, if the meter
// was 0 or positive in value, it'll be replenished only up to it's allowance
// for the current block.
meter = meter.Add(allowance)
if meter.GT(allowance) {
meter = allowance
}
k.SetSlashMeter(ctx, meter)
}
// GetSlashMeterAllowance returns the amount of voting power units (int)
// that would be added to the slash meter for a replenishment this block,
// this allowance value also serves as the max value for the meter for this block.
func (k Keeper) GetSlashMeterAllowance(ctx sdktypes.Context) sdktypes.Int {
strFrac := k.GetSlashMeterReplenishFraction(ctx)
decFrac, err := sdktypes.NewDecFromStr(strFrac)
if err != nil {
panic(fmt.Sprintf("failed to parse slash meter replenish fraction: %s", err))
}
// Compute allowance in units of tendermint voting power (integer),
// noting that total power changes over time
totalPower := k.stakingKeeper.GetLastTotalPower(ctx)
roundedInt := sdktypes.NewInt(decFrac.MulInt(totalPower).RoundInt64())
if roundedInt.IsZero() {
k.Logger(ctx).Info("slash meter replenish fraction is too small " +
"to add any allowance to the meter, considering bankers rounding")
// Return non-zero allowance to guarantee some slash packets are eventually handled
return sdktypes.NewInt(1)
}
return roundedInt
}
//
// CRUD section
//
// QueueGlobalSlashEntry queues an entry to the "global" slash packet queue, used for throttling val power changes
// related to jailing/tombstoning over time. This "global" queue is used to coordinate the order of slash packet handling
// between chains, whereas the chain-specific queue is used to coordinate the order of slash and vsc matured packets
// relevant to each chain.
func (k Keeper) QueueGlobalSlashEntry(ctx sdktypes.Context,
entry providertypes.GlobalSlashEntry) {
store := ctx.KVStore(k.storeKey)
key := providertypes.GlobalSlashEntryKey(entry)
store.Set(key, entry.ProviderValConsAddr)
}
// GetAllGlobalSlashEntries returns all global slash entries in the global queue slash entry queue.
//
// Note: This method is used for testing purposes only.
func (k Keeper) GetAllGlobalSlashEntries(ctx sdktypes.Context) (entries []providertypes.GlobalSlashEntry) {
k.IterateGlobalSlashEntries(ctx, func(entry providertypes.GlobalSlashEntry) (stop bool) {
entries = append(entries, entry)
// Continue iteration
stop = false
return stop
})
return entries
}
// DeleteGlobalSlashEntriesForConsumer deletes all pending slash packet entries in the global queue,
// only relevant to a single consumer.
func (k Keeper) DeleteGlobalSlashEntriesForConsumer(ctx sdktypes.Context, consumerChainID string) {
entriesToDel := []providertypes.GlobalSlashEntry{}
k.IterateGlobalSlashEntries(ctx, func(entry providertypes.GlobalSlashEntry) (stop bool) {
if entry.ConsumerChainID == consumerChainID {
entriesToDel = append(entriesToDel, entry)
}
// Continue iteration
stop = false
return stop
})
k.DeleteGlobalSlashEntries(ctx, entriesToDel...)
}
// IterateGlobalSlashEntries iterates over the global slash entry queue and calls the provided callback
func (k Keeper) IterateGlobalSlashEntries(ctx sdktypes.Context,
cb func(providertypes.GlobalSlashEntry) (stop bool)) {
store := ctx.KVStore(k.storeKey)
iterator := sdktypes.KVStorePrefixIterator(store, []byte{providertypes.GlobalSlashEntryBytePrefix})
defer iterator.Close()
for ; iterator.Valid(); iterator.Next() {
recvTime, chainID, ibcSeqNum := providertypes.ParseGlobalSlashEntryKey(iterator.Key())
valAddr := iterator.Value()
entry := providertypes.NewGlobalSlashEntry(recvTime, chainID, ibcSeqNum, valAddr)
stop := cb(entry)
if stop {
break
}
}
}
// DeleteGlobalSlashEntries deletes the given global entries from the global slash queue
func (k Keeper) DeleteGlobalSlashEntries(ctx sdktypes.Context, entries ...providertypes.GlobalSlashEntry) {
store := ctx.KVStore(k.storeKey)
for _, entry := range entries {
store.Delete(providertypes.GlobalSlashEntryKey(entry))
}
}
// Pending packet data type enum, used to encode the type of packet data stored at each entry in the mutual queue.
const (
slashPacketData byte = iota
vscMaturedPacketData
)
// PanicIfTooMuchThrottledPacketData is a sanity check to ensure that the chain-specific
// throttled packet data queue does not grow too large for a single consumer chain.
func (k Keeper) PanicIfTooMuchThrottledPacketData(ctx sdktypes.Context, consumerChainID string) {
size := k.GetThrottledPacketDataSize(ctx, consumerChainID)
if size > uint64(k.GetMaxPendingSlashingPackets(ctx)) {
panic(fmt.Sprintf("throttled packet data queue for chain %s is too large: %d", consumerChainID, size))
}
}
// GetThrottledPacketDataSize returns the size of the throttled packet data queue for the given consumer chain
func (k Keeper) GetThrottledPacketDataSize(ctx sdktypes.Context, consumerChainID string) uint64 {
store := ctx.KVStore(k.storeKey)
key := providertypes.ThrottledPacketDataSizeKey(consumerChainID)
var size uint64
bz := store.Get(key)
if bz == nil {
size = 0
} else {
size = binary.LittleEndian.Uint64(bz)
}
return size
}
// SetThrottledPacketDataSize sets the size of the throttled packet data queue for the given consumer chain
func (k Keeper) SetThrottledPacketDataSize(ctx sdktypes.Context, consumerChainID string, size uint64) {
store := ctx.KVStore(k.storeKey)
key := providertypes.ThrottledPacketDataSizeKey(consumerChainID)
bz := make([]byte, 8)
binary.LittleEndian.PutUint64(bz, size)
store.Set(key, bz)
}
// IncrementThrottledPacketDataSize increments the size of the throttled packet data
// queue for the given consumer chain.
func (k Keeper) IncrementThrottledPacketDataSize(ctx sdktypes.Context, consumerChainID string) {
size := k.GetThrottledPacketDataSize(ctx, consumerChainID)
k.SetThrottledPacketDataSize(ctx, consumerChainID, size+1)
}
// QueueThrottledSlashPacketData queues the slash packet data for a chain-specific throttled packet data queue.
//
// Note: This queue is shared between pending slash packet data and pending vsc matured packet data.
func (k Keeper) QueueThrottledSlashPacketData(
ctx sdktypes.Context, consumerChainID string, ibcSeqNum uint64, data ccvtypes.SlashPacketData) {
k.QueueThrottledPacketData(ctx, consumerChainID, ibcSeqNum, data)
}
// QueueThrottledVSCMaturedPacketData queues the vsc matured packet data for a chain-specific throttled packet data queue.
//
// Note: This queue is shared between pending slash packet data and pending vsc matured packet data.
func (k Keeper) QueueThrottledVSCMaturedPacketData(
ctx sdktypes.Context, consumerChainID string, ibcSeqNum uint64, data ccvtypes.VSCMaturedPacketData) {
k.QueueThrottledPacketData(ctx, consumerChainID, ibcSeqNum, data)
}
// QueueThrottledPacketData queues a slash packet data or vsc matured packet data instance
// for the given consumer chain's queue. This method is either used by tests, or called
// by higher level methods with type assertion.
func (k Keeper) QueueThrottledPacketData(
ctx sdktypes.Context, consumerChainID string, ibcSeqNum uint64, packetData interface{}) {
k.PanicIfTooMuchThrottledPacketData(ctx, consumerChainID)
store := ctx.KVStore(k.storeKey)
var bz []byte
var err error
switch data := packetData.(type) {
case ccvtypes.SlashPacketData:
bz, err = data.Marshal()
if err != nil {
panic(fmt.Sprintf("failed to marshal slash packet data: %v", err))
}
bz = append([]byte{slashPacketData}, bz...)
case ccvtypes.VSCMaturedPacketData:
bz, err = data.Marshal()
if err != nil {
panic(fmt.Sprintf("failed to marshal vsc matured packet data: %v", err))
}
bz = append([]byte{vscMaturedPacketData}, bz...)
default:
panic(fmt.Sprintf("unexpected packet data type: %T", data))
}
store.Set(providertypes.ThrottledPacketDataKey(consumerChainID, ibcSeqNum), bz)
k.IncrementThrottledPacketDataSize(ctx, consumerChainID)
}
// IterateThrottledPacketData iterates over the throttled packet data queue for a specific consumer chain
// (ordered by ibc seq number) and calls the provided callback
func (k Keeper) IterateThrottledPacketData(ctx sdktypes.Context, consumerChainID string, cb func(uint64, interface{}) (stop bool)) {
store := ctx.KVStore(k.storeKey)
iteratorPrefix := providertypes.ChainIdWithLenKey(providertypes.ThrottledPacketDataBytePrefix, consumerChainID)
iterator := sdktypes.KVStorePrefixIterator(store, iteratorPrefix)
defer iterator.Close()
for ; iterator.Valid(); iterator.Next() {
var packetData interface{}
var err error
bz := iterator.Value()
switch bz[0] {
case slashPacketData:
spd := ccvtypes.SlashPacketData{}
err = spd.Unmarshal(bz[1:])
packetData = spd
case vscMaturedPacketData:
vpd := ccvtypes.VSCMaturedPacketData{}
err = vpd.Unmarshal(bz[1:])
packetData = vpd
default:
panic("invalid packet data type")
}
if err != nil {
panic(fmt.Sprintf("failed to unmarshal pending packet data: %v", err))
}
_, ibcSeqNum, err := providertypes.ParseThrottledPacketDataKey(iterator.Key())
if err != nil {
panic(fmt.Sprintf("failed to parse pending packet data key: %v", err))
}
stop := cb(ibcSeqNum, packetData)
if stop {
break
}
}
}
// GetAllThrottledPacketData returns all throttled packet data for a specific consumer chain.
//
// Note: This method is only used by tests
func (k Keeper) GetAllThrottledPacketData(ctx sdktypes.Context, consumerChainID string) (
[]ccvtypes.SlashPacketData, []ccvtypes.VSCMaturedPacketData) {
slashData := []ccvtypes.SlashPacketData{}
vscMaturedData := []ccvtypes.VSCMaturedPacketData{}
k.IterateThrottledPacketData(ctx, consumerChainID, func(ibcSeqNum uint64, data interface{}) (stop bool) {
switch data := data.(type) {
case ccvtypes.SlashPacketData:
slashData = append(slashData, data)
case ccvtypes.VSCMaturedPacketData:
vscMaturedData = append(vscMaturedData, data)
default:
panic(fmt.Sprintf("unexpected pending packet data type: %T", data))
}
// Continue iteration
stop = false
return stop
})
return slashData, vscMaturedData
}
// DeleteAllThrottledPacketDataForConsumer deletes all throttled packet data for the given consumer chain.
func (k Keeper) DeleteAllThrottledPacketDataForConsumer(ctx sdktypes.Context, consumerChainID string) {
ibcSeqNumsToDelete := []uint64{}
k.IterateThrottledPacketData(ctx, consumerChainID, func(ibcSeqNum uint64, packetData interface{}) bool {
ibcSeqNumsToDelete = append(ibcSeqNumsToDelete, ibcSeqNum)
// Continue iteration
stop := false
return stop
})
k.DeleteThrottledPacketData(ctx, consumerChainID, ibcSeqNumsToDelete...)
}
// DeleteThrottledPacketData deletes the given throttled packet data instances
// (specified by their ibc seq number) from the chain-specific throttled packet data queue.
func (k Keeper) DeleteThrottledPacketData(ctx sdktypes.Context, consumerChainID string, ibcSeqNumbers ...uint64) {
store := ctx.KVStore(k.storeKey)
for _, ibcSeqNum := range ibcSeqNumbers {
store.Delete(providertypes.ThrottledPacketDataKey(consumerChainID, ibcSeqNum))
}
// Decrement the size of the pending packet data queue
sizeBeforeDeletion := k.GetThrottledPacketDataSize(ctx, consumerChainID)
k.SetThrottledPacketDataSize(ctx, consumerChainID, sizeBeforeDeletion-uint64(len(ibcSeqNumbers)))
}
// GetSlashMeter returns a meter (persisted as a signed int) which stores an amount of voting power, corresponding
// to an allowance of validators that can be jailed/tombstoned over time.
//
// Note: the value of this int should always be in the range of tendermint's [-MaxVotingPower, MaxVotingPower]
func (k Keeper) GetSlashMeter(ctx sdktypes.Context) sdktypes.Int {
store := ctx.KVStore(k.storeKey)
bz := store.Get(providertypes.SlashMeterKey())
if bz == nil {
panic("slash meter not set")
}
value := sdktypes.ZeroInt()
err := value.Unmarshal(bz)
if err != nil {
panic(fmt.Sprintf("failed to unmarshal slash meter: %v", err))
}
return value
}
// SetSlashMeter sets the slash meter to the given signed int value
//
// Note: the value of this int should always be in the range of tendermint's [-MaxVotingPower, MaxVotingPower]
func (k Keeper) SetSlashMeter(ctx sdktypes.Context, value sdktypes.Int) {
if value.GT(sdktypes.NewInt(tmtypes.MaxTotalVotingPower)) {
panic("slash meter value cannot be greater than tendermint's MaxTotalVotingPower")
}
if value.LT(sdktypes.NewInt(-tmtypes.MaxTotalVotingPower)) {
panic("slash meter value cannot be less than negative tendermint's MaxTotalVotingPower")
}
store := ctx.KVStore(k.storeKey)
bz, err := value.Marshal()
if err != nil {
panic(fmt.Sprintf("failed to marshal slash meter: %v", err))
}
store.Set(providertypes.SlashMeterKey(), bz)
}
// GetLastSlashMeterFullTime returns the last UTC time the slash meter was full.
func (k Keeper) GetLastSlashMeterFullTime(ctx sdktypes.Context) time.Time {
store := ctx.KVStore(k.storeKey)
bz := store.Get(providertypes.LastSlashMeterReplenishTimeKey())
if bz == nil {
panic("last slash replenish time not set")
}
time, err := sdktypes.ParseTimeBytes(bz)
if err != nil {
panic(fmt.Sprintf("failed to parse last slash meter replenish time: %s", err))
}
return time.UTC()
}
// SetLastSlashMeterReplenishTime sets the most recent time the slash meter was full.
func (k Keeper) SetLastSlashMeterFullTime(ctx sdktypes.Context, time time.Time) {
store := ctx.KVStore(k.storeKey)
store.Set(providertypes.LastSlashMeterReplenishTimeKey(), sdktypes.FormatTimeBytes(time.UTC()))
}