@@ -52,6 +52,7 @@ type gpbftRunner struct {
52
52
53
53
inputs gpbftInputs
54
54
msgEncoding gMessageEncoding
55
+ pmm * partialMessageManager
55
56
}
56
57
57
58
type roundPhase struct {
@@ -141,6 +142,12 @@ func newRunner(
141
142
} else {
142
143
runner .msgEncoding = & cborGMessageEncoding {}
143
144
}
145
+
146
+ runner .pmm , err = newPartialMessageManager (runner .Progress , ps , m )
147
+ if err != nil {
148
+ return nil , fmt .Errorf ("creating partial message manager: %w" , err )
149
+ }
150
+
144
151
return runner , nil
145
152
}
146
153
@@ -156,6 +163,11 @@ func (h *gpbftRunner) Start(ctx context.Context) (_err error) {
156
163
return err
157
164
}
158
165
166
+ completedMessageQueue , err := h .pmm .Start (ctx )
167
+ if err != nil {
168
+ return err
169
+ }
170
+
159
171
finalityCertificates , unsubCerts := h .certStore .Subscribe ()
160
172
select {
161
173
case c := <- finalityCertificates :
@@ -193,7 +205,7 @@ func (h *gpbftRunner) Start(ctx context.Context) (_err error) {
193
205
default :
194
206
}
195
207
196
- // Handle messages, finality certificates, and alarms
208
+ // Handle messages, completed messages, finality certificates, and alarms
197
209
select {
198
210
case c := <- finalityCertificates :
199
211
if err := h .receiveCertificate (c ); err != nil {
@@ -219,6 +231,29 @@ func (h *gpbftRunner) Start(ctx context.Context) (_err error) {
219
231
// errors.
220
232
log .Errorf ("error when processing message: %+v" , err )
221
233
}
234
+ case gmsg , ok := <- completedMessageQueue :
235
+ if ! ok {
236
+ return fmt .Errorf ("incoming completed message queue closed" )
237
+ }
238
+ switch validatedMessage , err := h .participant .ValidateMessage (gmsg ); {
239
+ case errors .Is (err , gpbft .ErrValidationInvalid ):
240
+ log .Debugw ("validation error while validating completed message" , "err" , err )
241
+ // TODO: Signal partial message manager to penalise sender,
242
+ // e.g. reduce the total number of messages stroed from sender?
243
+ case errors .Is (err , gpbft .ErrValidationTooOld ):
244
+ // TODO: Signal partial message manager to drop the instance?
245
+ case errors .Is (err , gpbft .ErrValidationNotRelevant ):
246
+ // TODO: Signal partial message manager to drop irrelevant messages?
247
+ case errors .Is (err , gpbft .ErrValidationNoCommittee ):
248
+ log .Debugw ("committee error while validating completed message" , "err" , err )
249
+ case err != nil :
250
+ log .Errorw ("unknown error while validating completed message" , "err" , err )
251
+ default :
252
+ recordValidatedMessage (ctx , validatedMessage )
253
+ if err := h .participant .ReceiveMessage (validatedMessage ); err != nil {
254
+ log .Errorw ("error while processing completed message" , "err" , err )
255
+ }
256
+ }
222
257
case <- h .runningCtx .Done ():
223
258
return nil
224
259
}
@@ -452,7 +487,17 @@ func (h *gpbftRunner) BroadcastMessage(ctx context.Context, msg *gpbft.GMessage)
452
487
if h .topic == nil {
453
488
return pubsub .ErrTopicClosed
454
489
}
455
- encoded , err := h .msgEncoding .Encode (msg )
490
+
491
+ if err := h .pmm .BroadcastChain (ctx , msg .Vote .Instance , msg .Vote .Value ); err != nil {
492
+ // Silently log the error and continue. Partial message manager should take care of re-broadcast.
493
+ log .Warnw ("failed to broadcast chain" , "instance" , msg .Vote .Instance , "error" , err )
494
+ }
495
+
496
+ pmsg , err := h .pmm .toPartialGMessage (msg )
497
+ if err != nil {
498
+ return err
499
+ }
500
+ encoded , err := h .msgEncoding .Encode (pmsg )
456
501
if err != nil {
457
502
return fmt .Errorf ("encoding GMessage for broadcast: %w" , err )
458
503
}
@@ -472,7 +517,17 @@ func (h *gpbftRunner) rebroadcastMessage(msg *gpbft.GMessage) error {
472
517
if h .topic == nil {
473
518
return pubsub .ErrTopicClosed
474
519
}
475
- encoded , err := h .msgEncoding .Encode (msg )
520
+
521
+ if err := h .pmm .BroadcastChain (h .runningCtx , msg .Vote .Instance , msg .Vote .Value ); err != nil {
522
+ // Silently log the error and continue. Partial message manager should take care of re-broadcast.
523
+ log .Warnw ("failed to rebroadcast chain" , "instance" , msg .Vote .Instance , "error" , err )
524
+ }
525
+
526
+ pmsg , err := h .pmm .toPartialGMessage (msg )
527
+ if err != nil {
528
+ return err
529
+ }
530
+ encoded , err := h .msgEncoding .Encode (pmsg )
476
531
if err != nil {
477
532
return fmt .Errorf ("encoding GMessage for broadcast: %w" , err )
478
533
}
@@ -489,12 +544,28 @@ func (h *gpbftRunner) validatePubsubMessage(ctx context.Context, _ peer.ID, msg
489
544
recordValidationTime (ctx , start , _result )
490
545
}(time .Now ())
491
546
492
- gmsg , err := h .msgEncoding .Decode (msg .Data )
547
+ pgmsg , err := h .msgEncoding .Decode (msg .Data )
493
548
if err != nil {
494
549
log .Debugw ("failed to decode message" , "from" , msg .GetFrom (), "err" , err )
495
550
return pubsub .ValidationReject
496
551
}
497
552
553
+ gmsg , completed := h .pmm .CompleteMessage (ctx , pgmsg )
554
+ if ! completed {
555
+ // TODO: Partially validate the message because we can. To do this, however,
556
+ // message validator needs to be refactored to tolerate partial data.
557
+ // Hence, for now validation is postponed entirely until that refactor
558
+ // is done to accommodate partial messages.
559
+ // See: https://github.com/filecoin-project/go-f3/issues/813
560
+
561
+ // FIXME: must verify signature before buffering otherwise nodes can spoof the
562
+ // buffer with invalid messages on behalf of other peers as censorship
563
+ // attack.
564
+
565
+ msg .ValidatorData = pgmsg
566
+ return pubsub .ValidationAccept
567
+ }
568
+
498
569
switch validatedMessage , err := h .participant .ValidateMessage (gmsg ); {
499
570
case errors .Is (err , gpbft .ErrValidationInvalid ):
500
571
log .Debugf ("validation error during validation: %+v" , err )
@@ -588,15 +659,18 @@ func (h *gpbftRunner) startPubsub() (<-chan gpbft.ValidatedMessage, error) {
588
659
}
589
660
return fmt .Errorf ("pubsub message subscription returned an error: %w" , err )
590
661
}
591
- gmsg , ok := msg .ValidatorData .(gpbft.ValidatedMessage )
592
- if ! ok {
662
+
663
+ switch gmsg := msg .ValidatorData .(type ) {
664
+ case gpbft.ValidatedMessage :
665
+ select {
666
+ case messageQueue <- gmsg :
667
+ case <- h .runningCtx .Done ():
668
+ return nil
669
+ }
670
+ case * PartialGMessage :
671
+ h .pmm .bufferPartialMessage (h .runningCtx , gmsg )
672
+ default :
593
673
log .Errorf ("invalid msgValidatorData: %+v" , msg .ValidatorData )
594
- continue
595
- }
596
- select {
597
- case messageQueue <- gmsg :
598
- case <- h .runningCtx .Done ():
599
- return nil
600
674
}
601
675
}
602
676
return nil
@@ -632,18 +706,25 @@ func (h *gpbftHost) RequestRebroadcast(instant gpbft.Instant) error {
632
706
}
633
707
634
708
func (h * gpbftHost ) GetProposal (instance uint64 ) (* gpbft.SupplementalData , gpbft.ECChain , error ) {
635
- return h .inputs .GetProposal (h .runningCtx , instance )
709
+ proposal , chain , err := h .inputs .GetProposal (h .runningCtx , instance )
710
+ if err == nil {
711
+ if err := h .pmm .BroadcastChain (h .runningCtx , instance , chain ); err != nil {
712
+ log .Warnw ("failed to broadcast chain" , "instance" , instance , "error" , err )
713
+ }
714
+ }
715
+ return proposal , chain , err
636
716
}
637
717
638
718
func (h * gpbftHost ) GetCommittee (instance uint64 ) (* gpbft.Committee , error ) {
639
719
return h .inputs .GetCommittee (h .runningCtx , instance )
640
720
}
641
721
642
- func (h * gpbftRunner ) Stop (context.Context ) error {
722
+ func (h * gpbftRunner ) Stop (ctx context.Context ) error {
643
723
h .ctxCancel ()
644
724
return multierr .Combine (
645
725
h .wal .Close (),
646
726
h .errgrp .Wait (),
727
+ h .pmm .Shutdown (ctx ),
647
728
h .teardownPubsub (),
648
729
)
649
730
}
0 commit comments