-
Notifications
You must be signed in to change notification settings - Fork 0
/
pubsub_plebbit_validator.go
264 lines (230 loc) · 9.06 KB
/
pubsub_plebbit_validator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
package pubsubPlebbitValidator
import (
"context"
// "fmt"
"time"
"math"
pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
peer "github.com/libp2p/go-libp2p/core/peer"
crypto "github.com/libp2p/go-libp2p/core/crypto"
lru "github.com/hashicorp/golang-lru/v2"
host "github.com/libp2p/go-libp2p/core/host"
blake2b "github.com/minio/blake2b-simd"
)
func validateSignature(message map[string]interface{}, signature Signature) bool {
bytesToSign := getBytesToSign(message, signature.signedPropertyNames)
signatureVerified := verifyEd25519(bytesToSign, signature.signature, signature.publicKey)
if (signatureVerified == false) {
// fmt.Println("invalid signature")
return false
}
return true
}
func validateType(messageType string) bool {
if messageType != "CHALLENGEREQUEST" && messageType != "CHALLENGE" && messageType != "CHALLENGEANSWER" && messageType != "CHALLENGEVERIFICATION" {
// fmt.Println("invalid message type")
return false
}
return true
}
func validateChallengeRequestId(challengeRequestId []byte, signature Signature, messageType string) bool {
// challenge request id can only be invalid if from non sub owner, ie CHALLENGEREQUEST or CHALLENGEANSWER
if messageType != "CHALLENGEREQUEST" && messageType != "CHALLENGEANSWER" {
return true
}
publicKey, err := crypto.UnmarshalEd25519PublicKey(signature.publicKey)
if (err != nil) {
// fmt.Println("invalid challenge request id, failed crypto.UnmarshalEd25519PublicKey(signature.publicKey)", err)
return false
}
challengeRequestIdPeerId, err := peer.IDFromBytes(challengeRequestId)
if (err != nil) {
// fmt.Println("invalid challenge request id, failed peer.IDFromPublicKey(signature.publicKey)", err)
return false
}
if (challengeRequestIdPeerId.MatchesPublicKey(publicKey) == false) {
// fmt.Println("invalid challenge request id, failed challengeRequestId.MatchesPublicKey(publicKey)")
return false
}
return true
}
func validatePubsubTopic(pubsubTopic string, signature Signature, messageType string) bool {
// pubsub topic can only be invalid if from sub owner, ie CHALLENGE or CHALLENGEVERIFICATION
if messageType != "CHALLENGE" && messageType != "CHALLENGEVERIFICATION" {
return true
}
signaturePeerId, err := getPeerIdFromPublicKey(signature.publicKey)
if (err != nil) {
// fmt.Println("invalid pubsub topic, failed getPeerIdFromPublicKey(signature.publicKey)", err)
return false
}
if (pubsubTopic != signaturePeerId.String()) {
// fmt.Println("invalid pubsub topic, failed pubsubTopic == signaturePeerId")
return false
}
return true
}
func validateTimestamp(message map[string]interface{}, validator Validator) bool {
// ignore timestamp for tests that use old hardcoded signatures
if (validator.noTimestamp) {
return true
}
timestamp, ok := message["timestamp"].(uint64)
if !ok {
// fmt.Println("invalid message timestamp, failed convert message.timestamp to uint64")
return false
}
now := uint64(time.Now().Unix())
fiveMinutes := uint64(60 * 5)
if (timestamp > now + fiveMinutes) {
// fmt.Println("invalid message timestamp, newer than now + 5 minutes")
return false
}
if (timestamp < now - fiveMinutes) {
// fmt.Println("invalid message timestamp, older than 5 minutes")
return false
}
return true
}
func validatePeer(message map[string]interface{}, challengeRequestId []byte, peerId peer.ID, messageType string, validator Validator) bool {
// nothing to do for challenge message type
if (messageType == "CHALLENGE") {
return true
}
peerIdString := string(peerId)
// get challenge request id string
challengeRequestIdString := string(challengeRequestId)
if (!validator.challenges.Contains(challengeRequestIdString)) {
validator.challenges.Add(challengeRequestIdString, make(map[string]bool))
}
// get peer hostnames associated with the challenge request id
challengePeers, _ := validator.challenges.Get(challengeRequestIdString)
// on challenge verification, challenges and peer statistics are updated with the completed challenge
if (messageType == "CHALLENGEVERIFICATION") {
// update the peer hostname completedChallengeCount
if (validator.peersStatistics.Contains(peerIdString)) {
peerStatistics, _ := validator.peersStatistics.Get(peerIdString)
peerStatistics.completedChallengeCount++
}
// delete the challenge because it's now completed
validator.challenges.Remove(challengeRequestIdString)
return true
}
// the 2 message types left are CHALLENGEREQUEST AND CHALLENGEANSWER
// handle setting Validator.peersStatistics
if (!validator.peersStatistics.Contains(peerIdString)) {
validator.peersStatistics.Add(peerIdString, PeerStatistics{1, 0})
} else {
peerStatistics, _ := validator.peersStatistics.Get(peerIdString)
peerStatistics.challengeCount++
}
// handle setting Validator.challenges
challengePeers[peerIdString] = true
return true
}
type PeerStatistics struct {
challengeCount uint
completedChallengeCount uint
}
type Validator struct {
host host.Host
challenges *lru.Cache[string, map[string]bool]
peersStatistics *lru.Cache[string, PeerStatistics]
noTimestamp bool
}
func NewValidator(host host.Host) Validator {
challenges, _ := lru.New[string, map[string]bool](10000)
peersStatistics, _ := lru.New[string, PeerStatistics](10000)
return Validator{
host,
challenges,
peersStatistics,
false,
}
}
func (validator Validator) Validate(ctx context.Context, peerId peer.ID, pubsubMessage *pubsub.Message) bool {
// cbor decode
message, err := cborDecode(pubsubMessage.Data)
if (err != nil) {
// fmt.Println("failed cbor decode", err)
return false
}
signature, err := toSignature(message["signature"])
if (err != nil) {
// fmt.Println("invalid signature, failed cbor decode", err)
return false
}
messageType, ok := message["type"].(string)
if !ok {
// fmt.Println("invalid message type, failed convert message.type to string")
return false
}
challengeRequestId, ok := message["challengeRequestId"].([]byte)
if !ok {
// fmt.Println("invalid challenge request id, failed convert message.challengeRequestId to []byte")
return false
}
// validate message type
validType := validateType(messageType)
if (validType == false) {
return false
}
// validate signature
signed := validateSignature(message, signature)
if (signed == false) {
return false
}
// validate challengeRequestId if from author
validChallengeRequestId := validateChallengeRequestId(challengeRequestId, signature, messageType)
if (validChallengeRequestId == false) {
return false
}
// validate pubsub topic if from subplebbit owner
validPubsubTopic := validatePubsubTopic(*pubsubMessage.Topic, signature, messageType)
if (validPubsubTopic == false) {
return false
}
// validate timestamp
validTimestamp := validateTimestamp(message, validator)
if (validTimestamp == false) {
return false
}
// validate too many failed requests forwards
validPeer := validatePeer(message, challengeRequestId, peerId, messageType, validator)
if (validPeer == false) {
return false
}
// debug peer validator
// fmt.Println(validator.challenges.Keys())
// fmt.Println(validator.peersStatistics.Keys())
// peerIds := validator.peersStatistics.Keys()
// for i := 0; i < len(peerIds); i++ {
// fmt.Println(validator.peersStatistics.Get(peerIds[i]))
// }
// validator.AppSpecificScore(peerId)
return true
}
var minimumChallengeCount uint = 100
var worstScore float64 = -100000
func (validator Validator) AppSpecificScore(peerId peer.ID) float64 {
// disable AppSpecificScore until we can test it more
return 0
peerStatistics, _ := validator.peersStatistics.Get(string(peerId))
// need a minimum count for statistics to mean something
if (peerStatistics.challengeCount < minimumChallengeCount) {
return 0
}
challengeFailureRatio := float64(1 - (peerStatistics.completedChallengeCount / peerStatistics.challengeCount))
// 1% failure ratio: 0.01²×−100000 = -10
// 10% failure ratio: 0.10²×−100000 = -1000
// 50% failure ratio: 0.50²×−100000 = -25000
// 90% failure ratio: 0.90²×−100000 = -81000
score := math.Pow(challengeFailureRatio, 2) * worstScore
return score
}
// use blake2b because it's faster than sha, copied from https://github.com/filecoin-project/lotus/blob/42d2f4d7e48104c4b8c6f19720e4eef369976442/node/modules/lp2p/pubsub.go
func MessageIdFn(m *pubsub_pb.Message) string {
hash := blake2b.Sum256(m.Data)
return string(hash[:])
}