Skip to content

Commit

Permalink
fraud: rework unmarshaler registration (#938)
Browse files Browse the repository at this point in the history
* fraud: rework unmarshaler registration

* fraud/docs: update an ADR
  • Loading branch information
vgonkivs authored Jul 29, 2022
1 parent 0c2daea commit da5a5a4
Show file tree
Hide file tree
Showing 15 changed files with 149 additions and 224 deletions.
1 change: 0 additions & 1 deletion das/daser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,6 @@ func TestDASer_stopsAfter_BEFP(t *testing.T) {

// create fraud service and break one header
f := fraud.NewService(ps, mockGet.GetByHeight, ds)
require.NoError(t, f.RegisterUnmarshaler(fraud.BadEncoding, fraud.UnmarshalBEFP))
mockGet.headers[1] = header.CreateFraudExtHeader(t, mockGet.headers[1], bServ)

// create and start DASer
Expand Down
14 changes: 4 additions & 10 deletions docs/adr/adr-006-fraud-service.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
- 2022.06.15 - Extend Proof interface with HeaderHash method
- 2022.06.22 - Updated rsmt2d to change isRow to Axis
- 2022.07.03 - Add storage description
- 2022.07.23 - rework unmarshalers registration

## Authors

Expand Down Expand Up @@ -151,12 +152,6 @@ In addition, `das.Daser`:
// Subscribe allows to subscribe on pub sub topic by its type.
// Subscribe should register pub-sub validator on topic.
Subscribe(ctx context.Context, proofType ProofType) (Subscription, error)
// RegisterUnmarshaler registers unmarshaler for the given ProofType.
// If there is no unmarshaler for `ProofType`, then `Subscribe` returns an error.
RegisterUnmarshaller(proofType ProofType, f proofUnmarshaller) error
// UnregisterUnmarshaler removes unmarshaler for the given ProofType.
// If there is no unmarshaler for `ProofType`, then it returns an error.
UnregisterUnmarshaller(proofType ProofType) error{}
}
```

Expand All @@ -177,12 +172,11 @@ In addition, `das.Daser`:
stores map[ProofType]datastore.Datastore
topics map[ProofType]*pubsub.Topic
unmarshallers map[ProofType]ProofUnmarshaller
getter headerFetcher
ds datastore.Datastore
}
func(s *service) RegisterUnmarshaler(proofType ProofType, f ProofUnmarshaller) error{}
func(s *service) UnregisterUnmarshaler(proofType ProofType) error{}
func(s *service) Subscribe(ctx context.Context, proofType ProofType) (Subscription, error){}
func(s *service) Broadcast(ctx context.Context, p Proof) error{}
```
Expand Down
6 changes: 5 additions & 1 deletion fraud/bad_encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@ import (
"github.com/celestiaorg/celestia-node/ipld/plugin"
)

func init() {
Register(BadEncoding, UnmarshalBEFP)
}

type BadEncodingProof struct {
headerHash []byte
BlockHeight uint64
// ShareWithProof contains all shares from row or col.
// Shares that did not pass verification in rmst2d will be nil.
// Shares that did not pass verification in rsmt2d will be nil.
// For non-nil shares MerkleProofs are computed.
Shares []*ipld.ShareWithProof
// Index represents the row/col index where ErrByzantineRow/ErrByzantineColl occurred.
Expand Down
29 changes: 29 additions & 0 deletions fraud/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package fraud

import (
"context"

"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
)

const fraudSubSuffix = "-sub"

func getSubTopic(p ProofType) string {
return p.String() + fraudSubSuffix
}

func join(p *pubsub.PubSub, proofType ProofType,
validate func(context.Context, ProofType, *pubsub.Message) pubsub.ValidationResult) (*pubsub.Topic, error) {
t, err := p.Join(getSubTopic(proofType))
if err != nil {
return nil, err
}
err = p.RegisterTopicValidator(
getSubTopic(proofType),
func(ctx context.Context, _ peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
return validate(ctx, proofType, msg)
},
)
return t, err
}
16 changes: 5 additions & 11 deletions fraud/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,11 @@ type Broadcaster interface {
type Subscriber interface {
// Subscribe allows to subscribe on a Proof pub sub topic by its type.
Subscribe(ProofType) (Subscription, error)
// RegisterUnmarshaler registers unmarshaler for the given ProofType.
// If there is no unmarshaler for `ProofType`, then `Subscribe` returns an error.
RegisterUnmarshaler(ProofType, ProofUnmarshaler) error
// UnregisterUnmarshaler removes unmarshaler for the given ProofType.
// If there is no unmarshaler for `ProofType`, then it returns an error.
UnregisterUnmarshaler(ProofType) error
}

// Getter encompasses the behavior to fetch stored FraudProofs.
type Getter interface {
Get(context.Context, ProofType) ([]Proof, error)
}

// Subscription returns a valid proof if one is received on the topic.
Expand All @@ -51,8 +50,3 @@ type Subscription interface {
Proof(context.Context) (Proof, error)
Cancel()
}

// Getter encompasses the behavior to fetch stored FraudProofs.
type Getter interface {
Get(context.Context, ProofType) ([]Proof, error)
}
44 changes: 43 additions & 1 deletion fraud/proof.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package fraud

import (
"context"
"encoding"
"fmt"

Expand All @@ -15,6 +16,14 @@ func (e *ErrFraudExists) Error() string {
return fmt.Sprintf("fraud: %s proof exists\n", e.Proof[0].Type())
}

type errNoUnmarshaler struct {
proofType ProofType
}

func (e *errNoUnmarshaler) Error() string {
return fmt.Sprintf("fraud: unmarshaler for %s type is not registered", e.proofType)
}

type ProofType int

const (
Expand Down Expand Up @@ -44,5 +53,38 @@ type Proof interface {
Validate(*header.ExtendedHeader) error

encoding.BinaryMarshaler
encoding.BinaryUnmarshaler
}

// OnProof subscribes to the given Fraud Proof topic via the given Subscriber.
// In case a Fraud Proof is received, then the given handle function will be invoked.
func OnProof(ctx context.Context, subscriber Subscriber, p ProofType, handle func(proof Proof)) {
subscription, err := subscriber.Subscribe(p)
if err != nil {
log.Error(err)
return
}
defer subscription.Cancel()

// At this point we receive already verified fraud proof,
// so there is no need to call Validate.
proof, err := subscription.Proof(ctx)
if err != nil {
if err != context.Canceled {
log.Errorw("reading next proof failed", "err", err)
}
return
}

handle(proof)
}

// Unmarshal converts raw bytes into respective Proof type.
func Unmarshal(proofType ProofType, msg []byte) (Proof, error) {
unmarshalersLk.RLock()
defer unmarshalersLk.RUnlock()
unmarshaler, ok := defaultUnmarshalers[proofType]
if !ok {
return nil, &errNoUnmarshaler{proofType: proofType}
}
return unmarshaler(msg)
}
17 changes: 17 additions & 0 deletions fraud/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package fraud

import (
"sync"
)

var (
unmarshalersLk = sync.RWMutex{}
defaultUnmarshalers = map[ProofType]ProofUnmarshaler{}
)

// Register sets unmarshaler in map by provided ProofType.
func Register(proofType ProofType, unmarshaler ProofUnmarshaler) {
unmarshalersLk.Lock()
defer unmarshalersLk.Unlock()
defaultUnmarshalers[proofType] = unmarshaler
}
103 changes: 24 additions & 79 deletions fraud/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
)

Expand All @@ -23,7 +22,7 @@ const (
// It implements the Service interface.
type service struct {
topicsLk sync.RWMutex
topics map[ProofType]*topic
topics map[ProofType]*pubsub.Topic

storesLk sync.RWMutex
stores map[ProofType]datastore.Datastore
Expand All @@ -37,60 +36,28 @@ func NewService(p *pubsub.PubSub, getter headerFetcher, ds datastore.Datastore)
return &service{
pubsub: p,
getter: getter,
topics: make(map[ProofType]*topic),
topics: make(map[ProofType]*pubsub.Topic),
stores: make(map[ProofType]datastore.Datastore),
ds: ds,
}
}

func (f *service) Subscribe(proofType ProofType) (Subscription, error) {
func (f *service) Subscribe(proofType ProofType) (_ Subscription, err error) {
f.topicsLk.Lock()
defer f.topicsLk.Unlock()
t, ok := f.topics[proofType]
f.topicsLk.Unlock()
if !ok {
return nil, fmt.Errorf("fraud: unmarshaler for %s proof is not registered", proofType)
}
return newSubscription(t)
}

func (f *service) RegisterUnmarshaler(proofType ProofType, u ProofUnmarshaler) error {
f.topicsLk.RLock()
_, ok := f.topics[proofType]
f.topicsLk.RUnlock()
if ok {
return fmt.Errorf("fraud: unmarshaler for %s proof is registered", proofType)
}

t, err := f.pubsub.Join(getSubTopic(proofType))
if err != nil {
return err
t, err = join(f.pubsub, proofType, f.processIncoming)
if err != nil {
return nil, err
}
f.topics[proofType] = t
}
err = f.pubsub.RegisterTopicValidator(
getSubTopic(proofType),
func(ctx context.Context, _ peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
return f.processIncoming(ctx, proofType, msg)
},
)
subs, err := t.Subscribe()
if err != nil {
return err
}
f.topicsLk.Lock()
f.topics[proofType] = &topic{topic: t, unmarshal: u}
f.topicsLk.Unlock()
f.initStore(proofType)
return nil
}

func (f *service) UnregisterUnmarshaler(proofType ProofType) error {
f.topicsLk.Lock()
defer f.topicsLk.Unlock()
t, ok := f.topics[proofType]
if !ok {
return fmt.Errorf("fraud: unmarshaler for %s proof is not registered", proofType)
return nil, err
}
delete(f.topics, proofType)
return t.close()

return &subscription{subs}, nil
}

func (f *service) Broadcast(ctx context.Context, p Proof) error {
Expand All @@ -104,24 +71,20 @@ func (f *service) Broadcast(ctx context.Context, p Proof) error {
if !ok {
return fmt.Errorf("fraud: unmarshaler for %s proof is not registered", p.Type())
}
return t.publish(ctx, bin)
return t.Publish(ctx, bin)
}

func (f *service) processIncoming(
ctx context.Context,
proofType ProofType,
msg *pubsub.Message,
) pubsub.ValidationResult {
f.topicsLk.RLock()
t, ok := f.topics[proofType]
f.topicsLk.RUnlock()
if !ok {
panic("fraud: unmarshaler for the given proof type is not registered")
}
proof, err := t.unmarshal(msg.Data)
proof, err := Unmarshal(proofType, msg.Data)
if err != nil {
log.Errorw("failed to unmarshal fraud proof", err)
f.pubsub.BlacklistPeer(msg.ReceivedFrom)
log.Error(err)
if !errors.Is(err, &errNoUnmarshaler{}) {
f.pubsub.BlacklistPeer(msg.ReceivedFrom)
}
return pubsub.ValidationReject
}

Expand Down Expand Up @@ -151,6 +114,7 @@ func (f *service) processIncoming(
"hash", hex.EncodeToString(extHeader.DAH.Hash()),
"from", msg.ReceivedFrom.String(),
)
msg.ValidatorData = proof
f.storesLk.RLock()
store, ok := f.stores[proofType]
f.storesLk.RUnlock()
Expand All @@ -167,32 +131,13 @@ func (f *service) processIncoming(
}

func (f *service) Get(ctx context.Context, proofType ProofType) ([]Proof, error) {
f.storesLk.RLock()
store, ok := f.stores[proofType]
f.storesLk.RUnlock()
if !ok {
return nil, fmt.Errorf("fraud: proof type %s is not supported", proofType)
}

f.topicsLk.RLock()
t, ok := f.topics[proofType]
f.topicsLk.RUnlock()
if !ok {
return nil, fmt.Errorf("fraud: unmarshaler for proof type %s is not registered", proofType)
}

return getAll(ctx, store, t.unmarshal)
}

func (f *service) initStore(proofType ProofType) {
f.storesLk.Lock()
defer f.storesLk.Unlock()
_, ok := f.stores[proofType]
store, ok := f.stores[proofType]
if !ok {
f.stores[proofType] = namespace.Wrap(f.ds, makeKey(proofType))
store = namespace.Wrap(f.ds, makeKey(proofType))
f.stores[proofType] = store
}
}
f.storesLk.Unlock()

func getSubTopic(p ProofType) string {
return p.String() + "-sub"
return getAll(ctx, store, proofType)
}
Loading

0 comments on commit da5a5a4

Please sign in to comment.