Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fraud: implement fraud service #811

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions fraud/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,17 @@ package fraud

import (
"context"

logging "github.com/ipfs/go-log/v2"

"github.com/celestiaorg/celestia-node/header"
)

var log = logging.Logger("fraud")

// headerFetcher aliases a function that is used to fetch an ExtendedHeader from store by height.
type headerFetcher func(context.Context, uint64) (*header.ExtendedHeader, error)

// ProofUnmarshaler aliases a function that parses data to `Proof`.
type ProofUnmarshaler func([]byte) (Proof, error)

Expand Down
147 changes: 147 additions & 0 deletions fraud/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package fraud

import (
"context"
"encoding/hex"
"errors"
"sync"
"time"

pubsub "github.com/libp2p/go-libp2p-pubsub"
)

// service is responsible for validating and propagating Fraud Proofs.
// It implements the Service interface.
type service struct {
mu sync.RWMutex
unmarshalers map[ProofType]ProofUnmarshaler

topics *topics
getter headerFetcher
}

func NewService(p *pubsub.PubSub, getter headerFetcher) Service {
return &service{
topics: &topics{
pubsub: p,
pubSubTopics: make(map[ProofType]*pubsub.Topic),
},
unmarshalers: make(map[ProofType]ProofUnmarshaler),
getter: getter,
}
}

func (f *service) Subscribe(proofType ProofType) (Subscription, error) {
// TODO: @vgonkivs check if fraud proof is in fraud store, then return with error
f.mu.Lock()
vgonkivs marked this conversation as resolved.
Show resolved Hide resolved
u, ok := f.unmarshalers[proofType]
vgonkivs marked this conversation as resolved.
Show resolved Hide resolved
if !ok {
return nil, errors.New("fraud: unmarshaler is not registered")
}

t, ok := f.topics.getTopic(proofType)

// if topic was not stored in cache, then we should register a validator and
// join pubsub topic.
if !ok {
var err error
t, err = f.topics.join(proofType)
if err != nil {
f.mu.Unlock()
return nil, err
}
err = f.topics.registerValidator(proofType, f.processIncoming)
if err != nil {
f.mu.Unlock()
return nil, err
}
}
f.mu.Unlock()
return newSubscription(t, u)
}

func (f *service) RegisterUnmarshaler(proofType ProofType, u ProofUnmarshaler) error {
f.mu.Lock()
defer f.mu.Unlock()

if _, ok := f.unmarshalers[proofType]; ok {
return errors.New("fraud: unmarshaler is already registered")
}
f.unmarshalers[proofType] = u

return nil
}

func (f *service) UnregisterUnmarshaler(proofType ProofType) error {
f.mu.Lock()
defer f.mu.Unlock()

if _, ok := f.unmarshalers[proofType]; !ok {
return errors.New("fraud: unmarshaler is not registered")
}
delete(f.unmarshalers, proofType)

return nil

Wondertan marked this conversation as resolved.
Show resolved Hide resolved
}

func (f *service) Broadcast(ctx context.Context, p Proof) error {
bin, err := p.MarshalBinary()
if err != nil {
return err
}

return f.topics.publish(ctx, bin, p.Type())
}

func (f *service) processIncoming(
ctx context.Context,
proofType ProofType,
msg *pubsub.Message,
) pubsub.ValidationResult {
f.mu.RLock()
unmarshaler, ok := f.unmarshalers[proofType]
f.mu.RUnlock()
if !ok {
log.Error("unmarshaler is not found")
return pubsub.ValidationReject
}
proof, err := unmarshaler(msg.Data)
if err != nil {
log.Errorw("unmarshaling header error", err)
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
return pubsub.ValidationReject
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
}

// create a timeout for block fetching since our validator will be called synchronously
// and getter is a blocking function.
newCtx, cancel := context.WithTimeout(ctx, time.Minute*2)
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
extHeader, err := f.getter(newCtx, proof.Height())
if err != nil {
cancel()
// Timeout means there is a problem with the network.
// As we cannot prove or discard Fraud Proof, user must restart the node.
if errors.Is(err, context.DeadlineExceeded) {
log.Fatal("could not get block. Timeout reached. Please restart your node.")
}
log.Errorw("failed to fetch block: ",
vgonkivs marked this conversation as resolved.
Show resolved Hide resolved
"err", err)
return pubsub.ValidationReject
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
}
defer cancel()
vgonkivs marked this conversation as resolved.
Show resolved Hide resolved
err = proof.Validate(extHeader)
if err != nil {
log.Errorw("validation err: ",
vgonkivs marked this conversation as resolved.
Show resolved Hide resolved
"err", err)
f.topics.addPeerToBlacklist(msg.ReceivedFrom)
return pubsub.ValidationReject
}
log.Warnw("received an inbound proof", "kind", proof.Type(),
vgonkivs marked this conversation as resolved.
Show resolved Hide resolved
"hash", hex.EncodeToString(extHeader.DAH.Hash()),
"from", msg.ReceivedFrom.String(),
)
return pubsub.ValidationAccept
}

func getSubTopic(p ProofType) string {
return p.String() + "-sub"
}
38 changes: 38 additions & 0 deletions fraud/subscription.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package fraud

import (
"context"

pubsub "github.com/libp2p/go-libp2p-pubsub"
)

// subscription handles Fraud Proof from the pubsub topic.
type subscription struct {
subscription *pubsub.Subscription
unmarshaler ProofUnmarshaler
}

func newSubscription(topic *pubsub.Topic, u ProofUnmarshaler) (*subscription, error) {
sub, err := topic.Subscribe()
if err != nil {
return nil, err
}

return &subscription{sub, u}, nil
}

func (s *subscription) Proof(ctx context.Context) (Proof, error) {
data, err := s.subscription.Next(ctx)
if err != nil {
return nil, err
}
proof, err := s.unmarshaler(data.Data)
if err != nil {
return nil, err
}
return proof, nil
vgonkivs marked this conversation as resolved.
Show resolved Hide resolved
}

func (s *subscription) Cancel() {
s.subscription.Cancel()
}
73 changes: 73 additions & 0 deletions fraud/topics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package fraud

import (
"context"
"errors"
"sync"

"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
)

// topics allows to operate with pubsub connection and pubsub topics.
type topics struct {
mu sync.RWMutex
pubSubTopics map[ProofType]*pubsub.Topic

pubsub *pubsub.PubSub
}

// join subscribes to the topic of the given proof type.
func (t *topics) join(proofType ProofType) (*pubsub.Topic, error) {
t.mu.Lock()
defer t.mu.Unlock()

topic, err := t.pubsub.Join(getSubTopic(proofType))
if err != nil {
return nil, err
}
log.Debugf("successfully subscribed to topic: %s", getSubTopic(proofType))
t.pubSubTopics[proofType] = topic
return topic, nil
}

// getTopic returns pubsub topic.
func (t *topics) getTopic(proofType ProofType) (*pubsub.Topic, bool) {
t.mu.RLock()
defer t.mu.RUnlock()
topic, ok := t.pubSubTopics[proofType]
return topic, ok
}

// publish allows to publish Fraud Proofs to the network
func (t *topics) publish(ctx context.Context, data []byte, proofType ProofType) error {
t.mu.RLock()
defer t.mu.RUnlock()

if topic, ok := t.pubSubTopics[proofType]; ok {
return topic.Publish(ctx, data)
}

return errors.New("fraud: topic is not found")
}

// registerValidator adds an internal validation to topic inside libp2p for provided ProofType.
func (t *topics) registerValidator(
proofType ProofType,
val func(context.Context, ProofType, *pubsub.Message) pubsub.ValidationResult,
) error {
return t.pubsub.RegisterTopicValidator(
getSubTopic(proofType),
func(ctx context.Context, _ peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
return val(ctx, proofType, msg)
},
// make validation synchronous.
pubsub.WithValidatorInline(true),
)
}

// addPeerToBlacklist adds a peer to pubsub blacklist to avoid receiving messages
// from it in the future.
func (t *topics) addPeerToBlacklist(peer peer.ID) {
t.pubsub.BlacklistPeer(peer)
}