diff --git a/errors.go b/errors.go index 8a04f6b6..127499c7 100644 --- a/errors.go +++ b/errors.go @@ -2,11 +2,19 @@ package header import "fmt" -// VerifyError is thrown on during Verify if it fails. +// VerifyError is thrown during for Headers failed verification. type VerifyError struct { + // Reason why verification failed as inner error. Reason error + // Uncertain signals that the was not enough information to conclude a Header is correct or not. + // May happen with recent Headers during unfinished historical sync or because of local errors. + Uncertain bool } func (vr *VerifyError) Error() string { return fmt.Sprintf("header: verify: %s", vr.Reason.Error()) } + +func (vr *VerifyError) Unwrap() error { + return vr.Reason +} diff --git a/headertest/subscriber.go b/headertest/subscriber.go index bb8d05fc..bfb427c6 100644 --- a/headertest/subscriber.go +++ b/headertest/subscriber.go @@ -3,8 +3,6 @@ package headertest import ( "context" - pubsub "github.com/libp2p/go-libp2p-pubsub" - "github.com/celestiaorg/go-header" ) @@ -16,7 +14,7 @@ func NewDummySubscriber() *Subscriber[*DummyHeader] { return &Subscriber[*DummyHeader]{} } -func (mhs *Subscriber[H]) AddValidator(func(context.Context, H) pubsub.ValidationResult) error { +func (mhs *Subscriber[H]) SetVerifier(func(context.Context, H) error) error { return nil } diff --git a/interface.go b/interface.go index e9596196..9f323c1f 100644 --- a/interface.go +++ b/interface.go @@ -20,15 +20,14 @@ type Subscriber[H Header] interface { // Subscribe creates long-living Subscription for validated Headers. // Multiple Subscriptions can be created. Subscribe() (Subscription[H], error) - // AddValidator registers a Validator for all Subscriptions. - // Registered Validators screen Headers for their validity - // before they are sent through Subscriptions. - // Multiple validators can be registered. - AddValidator(func(context.Context, H) pubsub.ValidationResult) error + // SetVerifier registers verification func for all Subscriptions. + // Registered func screens incoming headers + // before they are forwarded to Subscriptions. + // Only one func can be set. + SetVerifier(func(context.Context, H) error) error } -// Subscription can retrieve the next Header from the -// network. +// Subscription listens for new Headers. type Subscription[H Header] interface { // NextHeader returns the newest verified and valid Header // in the network. diff --git a/p2p/subscriber.go b/p2p/subscriber.go index 4db31c53..0c8d3467 100644 --- a/p2p/subscriber.go +++ b/p2p/subscriber.go @@ -2,6 +2,7 @@ package p2p import ( "context" + "errors" "fmt" pubsub "github.com/libp2p/go-libp2p-pubsub" @@ -52,8 +53,9 @@ func (p *Subscriber[H]) Stop(context.Context) error { return p.topic.Close() } -// AddValidator applies basic pubsub validator for the topic. -func (p *Subscriber[H]) AddValidator(val func(context.Context, H) pubsub.ValidationResult) error { +// SetVerifier set given verification func as Header PubSub topic validator +// Does not punish peers if *header.VerifyError is given with Uncertain set to true. +func (p *Subscriber[H]) SetVerifier(val func(context.Context, H) error) error { pval := func(ctx context.Context, p peer.ID, msg *pubsub.Message) pubsub.ValidationResult { var empty H maybeHead := empty.New() @@ -65,7 +67,17 @@ func (p *Subscriber[H]) AddValidator(val func(context.Context, H) pubsub.Validat return pubsub.ValidationReject } msg.ValidatorData = maybeHead - return val(ctx, maybeHead.(H)) + + var verErr *header.VerifyError + err = val(ctx, maybeHead.(H)) + switch { + case err == nil: + return pubsub.ValidationAccept + case errors.As(err, &verErr) && verErr.Uncertain: + return pubsub.ValidationIgnore + default: + return pubsub.ValidationReject + } } return p.pubsub.RegisterTopicValidator(p.pubsubTopicID, pval) } diff --git a/p2p/subscription_test.go b/p2p/subscription_test.go index b7128314..d3a783b9 100644 --- a/p2p/subscription_test.go +++ b/p2p/subscription_test.go @@ -66,9 +66,11 @@ func TestSubscriber(t *testing.T) { _, err = p2pSub2.Subscribe() require.NoError(t, err) - p2pSub1.AddValidator(func(context.Context, *headertest.DummyHeader) pubsub.ValidationResult { //nolint:errcheck - return pubsub.ValidationAccept + err = p2pSub1.SetVerifier(func(context.Context, *headertest.DummyHeader) error { + return nil }) + require.NoError(t, err) + subscription, err := p2pSub1.Subscribe() require.NoError(t, err) diff --git a/sync/sync.go b/sync/sync.go index fd4b2e8c..548d1920 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -83,7 +83,7 @@ func (s *Syncer[H]) Start(ctx context.Context) error { s.ctx, s.cancel = context.WithCancel(context.Background()) // register validator for header subscriptions // syncer does not subscribe itself and syncs headers together with validation - err := s.sub.AddValidator(s.incomingNetworkHead) + err := s.sub.SetVerifier(s.incomingNetworkHead) if err != nil { return err } diff --git a/sync/sync_head.go b/sync/sync_head.go index 1e0e88b4..1dc43639 100644 --- a/sync/sync_head.go +++ b/sync/sync_head.go @@ -5,8 +5,6 @@ import ( "errors" "time" - pubsub "github.com/libp2p/go-libp2p-pubsub" - "github.com/celestiaorg/go-header" ) @@ -49,7 +47,7 @@ func (s *Syncer[H]) Head(ctx context.Context, _ ...header.HeadOption) (H, error) // NOTE: We could trust the netHead like we do during 'automatic subjective initialization' // but in this case our subjective head is not expired, so we should verify netHead // and only if it is valid, set it as new head - s.incomingNetworkHead(ctx, netHead) + _ = s.incomingNetworkHead(ctx, netHead) // netHead was either accepted or rejected as the new subjective // anyway return most current known subjective head return s.subjectiveHead(ctx) @@ -136,33 +134,35 @@ func (s *Syncer[H]) setSubjectiveHead(ctx context.Context, netHead H) { // incomingNetworkHead processes new potential network headers. // If the header valid, sets as new subjective header. -func (s *Syncer[H]) incomingNetworkHead(ctx context.Context, netHead H) pubsub.ValidationResult { +func (s *Syncer[H]) incomingNetworkHead(ctx context.Context, netHead H) error { // ensure there is no racing between network head candidates s.incomingMu.Lock() defer s.incomingMu.Unlock() // first of all, check the validity of the netHead - res := s.validateHead(ctx, netHead) - if res == pubsub.ValidationAccept { - // and set it if valid - s.setSubjectiveHead(ctx, netHead) + err := s.validateHead(ctx, netHead) + if err != nil { + return err } - return res + // and set it if valid + s.setSubjectiveHead(ctx, netHead) + return nil } // validateHead checks validity of the given header against the subjective head. -func (s *Syncer[H]) validateHead(ctx context.Context, new H) pubsub.ValidationResult { +func (s *Syncer[H]) validateHead(ctx context.Context, new H) error { sbjHead, err := s.subjectiveHead(ctx) if err != nil { log.Errorw("getting subjective head during validation", "err", err) - return pubsub.ValidationIgnore // local error, so ignore + // local error, so uncertain + return &header.VerifyError{Reason: err, Uncertain: true} } - // ignore header if it's from the past if new.Height() <= sbjHead.Height() { log.Warnw("received known network header", "current_height", sbjHead.Height(), "header_height", new.Height(), "header_hash", new.Hash()) - return pubsub.ValidationIgnore + // set uncertain, if it's from the past + return &header.VerifyError{Reason: err, Uncertain: true} } // perform verification err = sbjHead.Verify(new) @@ -174,10 +174,10 @@ func (s *Syncer[H]) validateHead(ctx context.Context, new H) pubsub.ValidationRe "height_of_subjective", sbjHead.Height(), "hash_of_subjective", sbjHead.Hash(), "reason", verErr.Reason) - return pubsub.ValidationReject + return verErr } // and accept if the header is good - return pubsub.ValidationAccept + return nil } // TODO(@Wondertan): We should request TrustingPeriod from the network's state params or diff --git a/sync/sync_head_test.go b/sync/sync_head_test.go index 715d8dcd..9ed6b1a3 100644 --- a/sync/sync_head_test.go +++ b/sync/sync_head_test.go @@ -9,7 +9,6 @@ import ( "github.com/ipfs/go-datastore" sync2 "github.com/ipfs/go-datastore/sync" - pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -41,7 +40,7 @@ func TestSyncer_incomingNetworkHeadRaces(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - if syncer.incomingNetworkHead(ctx, incoming) == pubsub.ValidationAccept { + if syncer.incomingNetworkHead(ctx, incoming) == nil { hits.Add(1) } }() diff --git a/sync/sync_test.go b/sync/sync_test.go index cebf32d7..ba8adb9e 100644 --- a/sync/sync_test.go +++ b/sync/sync_test.go @@ -5,7 +5,6 @@ import ( "testing" "time" - pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -123,8 +122,8 @@ func TestSyncCatchUp(t *testing.T) { incomingHead := suite.GenDummyHeaders(1)[0] // 3. syncer rcvs header from the future and starts catching-up - res := syncer.incomingNetworkHead(ctx, incomingHead) - assert.Equal(t, pubsub.ValidationAccept, res) + err = syncer.incomingNetworkHead(ctx, incomingHead) + require.NoError(t, err) time.Sleep(time.Millisecond * 100) // needs some to realize it is syncing err = syncer.SyncWait(ctx) @@ -273,13 +272,15 @@ func TestSyncerIncomingDuplicate(t *testing.T) { err = remoteStore.Append(ctx, range1...) require.NoError(t, err) - res := syncer.incomingNetworkHead(ctx, range1[len(range1)-1]) - assert.Equal(t, pubsub.ValidationAccept, res) + err = syncer.incomingNetworkHead(ctx, range1[len(range1)-1]) + require.NoError(t, err) time.Sleep(time.Millisecond * 10) - res = syncer.incomingNetworkHead(ctx, range1[len(range1)-1]) - assert.Equal(t, pubsub.ValidationIgnore, res) + var verErr *header.VerifyError + err = syncer.incomingNetworkHead(ctx, range1[len(range1)-1]) + assert.ErrorAs(t, err, &verErr) + assert.True(t, verErr.Uncertain) err = syncer.SyncWait(ctx) require.NoError(t, err)