Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fraud: implement fraud service #811

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions fraud/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,17 @@ package fraud

import (
"context"

logging "github.com/ipfs/go-log/v2"

"github.com/celestiaorg/celestia-node/header"
)

var log = logging.Logger("fraud")

// headerFetcher aliases a function that is used to fetch an ExtendedHeader from store by height.
type headerFetcher func(context.Context, uint64) (*header.ExtendedHeader, error)

// ProofUnmarshaler aliases a function that parses data to `Proof`.
type ProofUnmarshaler func([]byte) (Proof, error)

Expand Down
130 changes: 130 additions & 0 deletions fraud/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package fraud

import (
"context"
"encoding/hex"
"errors"
"sync"
"time"

pubsub "github.com/libp2p/go-libp2p-pubsub"
)

// service is responsible for validating and propagating Fraud Proofs.
// It implements the Service interface.
type service struct {
pubsub *pubsub.PubSub
getter headerFetcher

mu sync.RWMutex
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
topics map[ProofType]*topic
}

func NewService(p *pubsub.PubSub, getter headerFetcher) Service {
return &service{
pubsub: p,
getter: getter,
topics: make(map[ProofType]*topic),
}
}

func (f *service) Subscribe(proofType ProofType) (Subscription, error) {
// TODO: @vgonkivs check if fraud proof is in fraud store, then return with error
f.mu.Lock()
vgonkivs marked this conversation as resolved.
Show resolved Hide resolved
t, ok := f.topics[proofType]
f.mu.Unlock()
if !ok {
return nil, errors.New("fraud: topic was not created")
vgonkivs marked this conversation as resolved.
Show resolved Hide resolved
}
return newSubscription(t)
}

func (f *service) RegisterUnmarshaler(proofType ProofType, u ProofUnmarshaler) error {
f.mu.Lock()
defer f.mu.Unlock()

t, err := createTopic(f.pubsub, proofType, u, f.processIncoming)
if err != nil {
return err
}
f.topics[proofType] = t
return nil
}

func (f *service) UnregisterUnmarshaler(proofType ProofType) error {
f.mu.Lock()
defer f.mu.Unlock()
t, ok := f.topics[proofType]
if !ok {
return errors.New("fraud: topic was not created")
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
}
delete(f.topics, proofType)
return t.close()

Wondertan marked this conversation as resolved.
Show resolved Hide resolved
}

func (f *service) Broadcast(ctx context.Context, p Proof) error {
bin, err := p.MarshalBinary()
if err != nil {
return err
}
f.mu.RLock()
t, ok := f.topics[p.Type()]
f.mu.RUnlock()
if !ok {
return errors.New("fraud: topic was not created")
}
return t.publish(ctx, bin)
}

func (f *service) processIncoming(
ctx context.Context,
proofType ProofType,
msg *pubsub.Message,
) pubsub.ValidationResult {
f.mu.RLock()
t, ok := f.topics[proofType]
f.mu.RUnlock()
if !ok {
log.Error("topic was not created")
return pubsub.ValidationReject
vgonkivs marked this conversation as resolved.
Show resolved Hide resolved
}
proof, err := t.unmarshal(msg.Data)
if err != nil {
log.Errorw("unmarshaling header error", err)
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
return pubsub.ValidationReject
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
}

// create a timeout for block fetching since our validator will be called synchronously
// and getter is a blocking function.
newCtx, cancel := context.WithTimeout(ctx, time.Minute*2)
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
extHeader, err := f.getter(newCtx, proof.Height())
if err != nil {
cancel()
// Timeout means there is a problem with the network.
// As we cannot prove or discard Fraud Proof, user must restart the node.
if errors.Is(err, context.DeadlineExceeded) {
log.Fatal("could not get block. Timeout reached. Please restart your node.")
}
log.Errorw("failed to fetch block: ",
vgonkivs marked this conversation as resolved.
Show resolved Hide resolved
"err", err)
return pubsub.ValidationReject
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
}
defer cancel()
vgonkivs marked this conversation as resolved.
Show resolved Hide resolved
err = proof.Validate(extHeader)
if err != nil {
log.Errorw("validation err: ",
vgonkivs marked this conversation as resolved.
Show resolved Hide resolved
"err", err)
f.pubsub.BlacklistPeer(msg.ReceivedFrom)
return pubsub.ValidationReject
}
log.Warnw("received an inbound proof", "kind", proof.Type(),
vgonkivs marked this conversation as resolved.
Show resolved Hide resolved
"hash", hex.EncodeToString(extHeader.DAH.Hash()),
"from", msg.ReceivedFrom.String(),
)
return pubsub.ValidationAccept
}

func getSubTopic(p ProofType) string {
return p.String() + "-sub"
}
38 changes: 38 additions & 0 deletions fraud/subscription.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package fraud

import (
"context"

pubsub "github.com/libp2p/go-libp2p-pubsub"
)

// subscription handles Fraud Proof from the pubsub topic.
type subscription struct {
subscription *pubsub.Subscription
unmarshaler ProofUnmarshaler
}

func newSubscription(t *topic) (*subscription, error) {
sub, err := t.topic.Subscribe()
if err != nil {
return nil, err
}

return &subscription{sub, t.unmarshal}, nil
}

func (s *subscription) Proof(ctx context.Context) (Proof, error) {
data, err := s.subscription.Next(ctx)
if err != nil {
return nil, err
}
proof, err := s.unmarshaler(data.Data)
if err != nil {
return nil, err
}
return proof, nil
vgonkivs marked this conversation as resolved.
Show resolved Hide resolved
}

func (s *subscription) Cancel() {
s.subscription.Cancel()
}
57 changes: 57 additions & 0 deletions fraud/topic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package fraud

import (
"context"

"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
)

// topic holds pubsub topic and unmarshaler of corresponded Fraud Proof
type topic struct {
topic *pubsub.Topic
unmarshal ProofUnmarshaler
}

// publish allows to publish Fraud Proofs to the network
func (t *topic) publish(ctx context.Context, data []byte) error {
return t.topic.Publish(ctx, data)
}

// close removes unmarshaler and closes pubsub topic
func (t *topic) close() error {
t.unmarshal = nil
return t.topic.Close()
}

func createTopic(
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
p *pubsub.PubSub,
proofType ProofType,
unmarshal ProofUnmarshaler,
val func(context.Context, ProofType, *pubsub.Message) pubsub.ValidationResult) (*topic, error) {
t, err := p.Join(getSubTopic(proofType))
if err != nil {
return nil, err
}
if err = registerValidator(p, proofType, val); err != nil {
return nil, err
}
log.Debugf("successfully subscribed to topic: %s", getSubTopic(proofType))
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
return &topic{topic: t, unmarshal: unmarshal}, nil
}

// registerValidator adds an internal validation to topic inside libp2p for provided ProofType.
func registerValidator(
p *pubsub.PubSub,
proofType ProofType,
val func(context.Context, ProofType, *pubsub.Message) pubsub.ValidationResult,
) error {
return p.RegisterTopicValidator(
getSubTopic(proofType),
func(ctx context.Context, _ peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
return val(ctx, proofType, msg)
},
// make validation synchronous.
pubsub.WithValidatorInline(true),
)
}
Wondertan marked this conversation as resolved.
Show resolved Hide resolved