Skip to content

Commit

Permalink
refactor!: Subcriber: AddValidator -> SetVerifier (#88)
Browse files Browse the repository at this point in the history
* Verifier is the correct term to use for stateful verification
* Set because, in fact we currently allow only one verification func
* Remove dependency pubsub to support returning header.VerifyError

NOTE: This breakage does not affect anyone, as the only user of SetVerifier is Syncer which is updated in this PR
  • Loading branch information
Wondertan authored Aug 15, 2023
1 parent fe54fd9 commit e500905
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 41 deletions.
10 changes: 9 additions & 1 deletion errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,19 @@ package header

import "fmt"

// VerifyError is thrown on during Verify if it fails.
// VerifyError is thrown during for Headers failed verification.
type VerifyError struct {
// Reason why verification failed as inner error.
Reason error
// Uncertain signals that the was not enough information to conclude a Header is correct or not.
// May happen with recent Headers during unfinished historical sync or because of local errors.
Uncertain bool
}

func (vr *VerifyError) Error() string {
return fmt.Sprintf("header: verify: %s", vr.Reason.Error())
}

func (vr *VerifyError) Unwrap() error {
return vr.Reason
}
4 changes: 1 addition & 3 deletions headertest/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package headertest
import (
"context"

pubsub "github.com/libp2p/go-libp2p-pubsub"

"github.com/celestiaorg/go-header"
)

Expand All @@ -16,7 +14,7 @@ func NewDummySubscriber() *Subscriber[*DummyHeader] {
return &Subscriber[*DummyHeader]{}
}

func (mhs *Subscriber[H]) AddValidator(func(context.Context, H) pubsub.ValidationResult) error {
func (mhs *Subscriber[H]) SetVerifier(func(context.Context, H) error) error {
return nil
}

Expand Down
13 changes: 6 additions & 7 deletions interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,14 @@ type Subscriber[H Header] interface {
// Subscribe creates long-living Subscription for validated Headers.
// Multiple Subscriptions can be created.
Subscribe() (Subscription[H], error)
// AddValidator registers a Validator for all Subscriptions.
// Registered Validators screen Headers for their validity
// before they are sent through Subscriptions.
// Multiple validators can be registered.
AddValidator(func(context.Context, H) pubsub.ValidationResult) error
// SetVerifier registers verification func for all Subscriptions.
// Registered func screens incoming headers
// before they are forwarded to Subscriptions.
// Only one func can be set.
SetVerifier(func(context.Context, H) error) error
}

// Subscription can retrieve the next Header from the
// network.
// Subscription listens for new Headers.
type Subscription[H Header] interface {
// NextHeader returns the newest verified and valid Header
// in the network.
Expand Down
18 changes: 15 additions & 3 deletions p2p/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package p2p

import (
"context"
"errors"
"fmt"

pubsub "github.com/libp2p/go-libp2p-pubsub"
Expand Down Expand Up @@ -52,8 +53,9 @@ func (p *Subscriber[H]) Stop(context.Context) error {
return p.topic.Close()
}

// AddValidator applies basic pubsub validator for the topic.
func (p *Subscriber[H]) AddValidator(val func(context.Context, H) pubsub.ValidationResult) error {
// SetVerifier set given verification func as Header PubSub topic validator
// Does not punish peers if *header.VerifyError is given with Uncertain set to true.
func (p *Subscriber[H]) SetVerifier(val func(context.Context, H) error) error {
pval := func(ctx context.Context, p peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
var empty H
maybeHead := empty.New()
Expand All @@ -65,7 +67,17 @@ func (p *Subscriber[H]) AddValidator(val func(context.Context, H) pubsub.Validat
return pubsub.ValidationReject
}
msg.ValidatorData = maybeHead
return val(ctx, maybeHead.(H))

var verErr *header.VerifyError
err = val(ctx, maybeHead.(H))
switch {
case err == nil:
return pubsub.ValidationAccept
case errors.As(err, &verErr) && verErr.Uncertain:
return pubsub.ValidationIgnore
default:
return pubsub.ValidationReject
}
}
return p.pubsub.RegisterTopicValidator(p.pubsubTopicID, pval)
}
Expand Down
6 changes: 4 additions & 2 deletions p2p/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,11 @@ func TestSubscriber(t *testing.T) {
_, err = p2pSub2.Subscribe()
require.NoError(t, err)

p2pSub1.AddValidator(func(context.Context, *headertest.DummyHeader) pubsub.ValidationResult { //nolint:errcheck
return pubsub.ValidationAccept
err = p2pSub1.SetVerifier(func(context.Context, *headertest.DummyHeader) error {
return nil
})
require.NoError(t, err)

subscription, err := p2pSub1.Subscribe()
require.NoError(t, err)

Expand Down
2 changes: 1 addition & 1 deletion sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (s *Syncer[H]) Start(ctx context.Context) error {
s.ctx, s.cancel = context.WithCancel(context.Background())
// register validator for header subscriptions
// syncer does not subscribe itself and syncs headers together with validation
err := s.sub.AddValidator(s.incomingNetworkHead)
err := s.sub.SetVerifier(s.incomingNetworkHead)
if err != nil {
return err
}
Expand Down
30 changes: 15 additions & 15 deletions sync/sync_head.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"errors"
"time"

pubsub "github.com/libp2p/go-libp2p-pubsub"

"github.com/celestiaorg/go-header"
)

Expand Down Expand Up @@ -49,7 +47,7 @@ func (s *Syncer[H]) Head(ctx context.Context, _ ...header.HeadOption) (H, error)
// NOTE: We could trust the netHead like we do during 'automatic subjective initialization'
// but in this case our subjective head is not expired, so we should verify netHead
// and only if it is valid, set it as new head
s.incomingNetworkHead(ctx, netHead)
_ = s.incomingNetworkHead(ctx, netHead)
// netHead was either accepted or rejected as the new subjective
// anyway return most current known subjective head
return s.subjectiveHead(ctx)
Expand Down Expand Up @@ -136,33 +134,35 @@ func (s *Syncer[H]) setSubjectiveHead(ctx context.Context, netHead H) {

// incomingNetworkHead processes new potential network headers.
// If the header valid, sets as new subjective header.
func (s *Syncer[H]) incomingNetworkHead(ctx context.Context, netHead H) pubsub.ValidationResult {
func (s *Syncer[H]) incomingNetworkHead(ctx context.Context, netHead H) error {
// ensure there is no racing between network head candidates
s.incomingMu.Lock()
defer s.incomingMu.Unlock()
// first of all, check the validity of the netHead
res := s.validateHead(ctx, netHead)
if res == pubsub.ValidationAccept {
// and set it if valid
s.setSubjectiveHead(ctx, netHead)
err := s.validateHead(ctx, netHead)
if err != nil {
return err
}
return res
// and set it if valid
s.setSubjectiveHead(ctx, netHead)
return nil
}

// validateHead checks validity of the given header against the subjective head.
func (s *Syncer[H]) validateHead(ctx context.Context, new H) pubsub.ValidationResult {
func (s *Syncer[H]) validateHead(ctx context.Context, new H) error {
sbjHead, err := s.subjectiveHead(ctx)
if err != nil {
log.Errorw("getting subjective head during validation", "err", err)
return pubsub.ValidationIgnore // local error, so ignore
// local error, so uncertain
return &header.VerifyError{Reason: err, Uncertain: true}
}
// ignore header if it's from the past
if new.Height() <= sbjHead.Height() {
log.Warnw("received known network header",
"current_height", sbjHead.Height(),
"header_height", new.Height(),
"header_hash", new.Hash())
return pubsub.ValidationIgnore
// set uncertain, if it's from the past
return &header.VerifyError{Reason: err, Uncertain: true}
}
// perform verification
err = sbjHead.Verify(new)
Expand All @@ -174,10 +174,10 @@ func (s *Syncer[H]) validateHead(ctx context.Context, new H) pubsub.ValidationRe
"height_of_subjective", sbjHead.Height(),
"hash_of_subjective", sbjHead.Hash(),
"reason", verErr.Reason)
return pubsub.ValidationReject
return verErr
}
// and accept if the header is good
return pubsub.ValidationAccept
return nil
}

// TODO(@Wondertan): We should request TrustingPeriod from the network's state params or
Expand Down
3 changes: 1 addition & 2 deletions sync/sync_head_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/ipfs/go-datastore"
sync2 "github.com/ipfs/go-datastore/sync"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -41,7 +40,7 @@ func TestSyncer_incomingNetworkHeadRaces(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
if syncer.incomingNetworkHead(ctx, incoming) == pubsub.ValidationAccept {
if syncer.incomingNetworkHead(ctx, incoming) == nil {
hits.Add(1)
}
}()
Expand Down
15 changes: 8 additions & 7 deletions sync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"testing"
"time"

pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -123,8 +122,8 @@ func TestSyncCatchUp(t *testing.T) {

incomingHead := suite.GenDummyHeaders(1)[0]
// 3. syncer rcvs header from the future and starts catching-up
res := syncer.incomingNetworkHead(ctx, incomingHead)
assert.Equal(t, pubsub.ValidationAccept, res)
err = syncer.incomingNetworkHead(ctx, incomingHead)
require.NoError(t, err)

time.Sleep(time.Millisecond * 100) // needs some to realize it is syncing
err = syncer.SyncWait(ctx)
Expand Down Expand Up @@ -273,13 +272,15 @@ func TestSyncerIncomingDuplicate(t *testing.T) {
err = remoteStore.Append(ctx, range1...)
require.NoError(t, err)

res := syncer.incomingNetworkHead(ctx, range1[len(range1)-1])
assert.Equal(t, pubsub.ValidationAccept, res)
err = syncer.incomingNetworkHead(ctx, range1[len(range1)-1])
require.NoError(t, err)

time.Sleep(time.Millisecond * 10)

res = syncer.incomingNetworkHead(ctx, range1[len(range1)-1])
assert.Equal(t, pubsub.ValidationIgnore, res)
var verErr *header.VerifyError
err = syncer.incomingNetworkHead(ctx, range1[len(range1)-1])
assert.ErrorAs(t, err, &verErr)
assert.True(t, verErr.Uncertain)

err = syncer.SyncWait(ctx)
require.NoError(t, err)
Expand Down

0 comments on commit e500905

Please sign in to comment.