Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fraud: add peer that sent an invalid fraud proof to black list #966

Merged
merged 8 commits into from
Aug 3, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions fraud/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ func getSubTopic(p ProofType) string {
}

func join(p *pubsub.PubSub, proofType ProofType,
validate func(context.Context, ProofType, *pubsub.Message) pubsub.ValidationResult) (*pubsub.Topic, error) {
validate func(context.Context, ProofType, peer.ID, *pubsub.Message) pubsub.ValidationResult) (*pubsub.Topic, error) {
t, err := p.Join(getSubTopic(proofType))
if err != nil {
return nil, err
}
err = p.RegisterTopicValidator(
getSubTopic(proofType),
func(ctx context.Context, _ peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
return validate(ctx, proofType, msg)
func(ctx context.Context, from peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
return validate(ctx, proofType, from, msg)
},
)
return t, err
Expand Down
4 changes: 3 additions & 1 deletion fraud/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (

logging "github.com/ipfs/go-log/v2"

pubsub "github.com/libp2p/go-libp2p-pubsub"

"github.com/celestiaorg/celestia-node/header"
)

Expand All @@ -28,7 +30,7 @@ type Service interface {
type Broadcaster interface {
// Broadcast takes a fraud `Proof` data structure that implements standard BinaryMarshal
// interface and broadcasts it to all subscribed peers.
Broadcast(context.Context, Proof) error
Broadcast(context.Context, Proof, ...pubsub.PubOpt) error
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
}

// Subscriber encompasses the behavior necessary to
Expand Down
12 changes: 7 additions & 5 deletions fraud/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
)

Expand Down Expand Up @@ -60,7 +61,7 @@ func (f *service) Subscribe(proofType ProofType) (_ Subscription, err error) {
return &subscription{subs}, nil
}

func (f *service) Broadcast(ctx context.Context, p Proof) error {
func (f *service) Broadcast(ctx context.Context, p Proof, opts ...pubsub.PubOpt) error {
bin, err := p.MarshalBinary()
if err != nil {
return err
Expand All @@ -71,19 +72,20 @@ func (f *service) Broadcast(ctx context.Context, p Proof) error {
if !ok {
return fmt.Errorf("fraud: unmarshaler for %s proof is not registered", p.Type())
}
return t.Publish(ctx, bin)
return t.Publish(ctx, bin, opts...)
}

func (f *service) processIncoming(
ctx context.Context,
proofType ProofType,
from peer.ID,
msg *pubsub.Message,
) pubsub.ValidationResult {
proof, err := Unmarshal(proofType, msg.Data)
if err != nil {
log.Error(err)
if !errors.Is(err, &errNoUnmarshaler{}) {
f.pubsub.BlacklistPeer(msg.ReceivedFrom)
f.pubsub.BlacklistPeer(from)
}
return pubsub.ValidationReject
}
Expand All @@ -106,13 +108,13 @@ func (f *service) processIncoming(
if err != nil {
log.Errorw("proof validation err: ",
"err", err, "proofType", proof.Type(), "height", proof.Height())
f.pubsub.BlacklistPeer(msg.ReceivedFrom)
f.pubsub.BlacklistPeer(from)
return pubsub.ValidationReject
}
log.Warnw("received fraud proof", "proofType", proof.Type(),
"height", proof.Height(),
"hash", hex.EncodeToString(extHeader.DAH.Hash()),
"from", msg.ReceivedFrom.String(),
"from", from.String(),
)
msg.ValidatorData = proof
f.storesLk.RLock()
Expand Down
206 changes: 204 additions & 2 deletions fraud/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/sync"
mdutils "github.com/ipfs/go-merkledag/test"
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"

Expand Down Expand Up @@ -56,18 +60,216 @@ func TestService_Broadcast(t *testing.T) {
require.NoError(t, p.Validate(faultHeader))
}

func TestService_BlackListPeer(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer t.Cleanup(cancel)
// create mock network
net, err := mocknet.FullMeshLinked(3)
require.NoError(t, err)
bServ := mdutils.Bserv()

// create first fraud service that will broadcast incorrect Fraud Proof
serviceA, store1 := createServiceWithHost(t, net.Hosts()[0])

h, err := store1.GetByHeight(context.TODO(), 1)
require.NoError(t, err)

// create and break byzantine error
_, err = generateByzantineError(ctx, t, h, bServ)
require.Error(t, err)
var errByz *ipld.ErrByzantine
require.True(t, errors.As(err, &errByz))
errByz.Index = 2

fserviceA := serviceA.(*service)
require.NotNil(t, fserviceA)

// create second service that will receive and validate Fraud Proof
serviceB, _ := createServiceWithHost(t, net.Hosts()[1])

fserviceB := serviceB.(*service)
require.NotNil(t, fserviceB)

bl, err := pubsub.NewTimeCachedBlacklist(time.Hour * 1)
vgonkivs marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, err)
// create pub sub in order to listen for Fraud Proof
psC, err := pubsub.NewGossipSub(ctx, net.Hosts()[2], // -> C
pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign), pubsub.WithBlacklist(bl))
require.NoError(t, err)

addrB := host.InfoFromHost(net.Hosts()[1]) // -> B

serviceC := NewService(psC, store1.GetByHeight, sync.MutexWrap(datastore.NewMapDatastore()))

sub0, err := net.Hosts()[0].EventBus().Subscribe(&event.EvtPeerIdentificationCompleted{})
require.NoError(t, err)
sub2, err := net.Hosts()[2].EventBus().Subscribe(&event.EvtPeerIdentificationCompleted{})
require.NoError(t, err)

// connect peers: A -> B -> C, so A and C are not connected to each other
require.NoError(t, net.Hosts()[0].Connect(ctx, *addrB)) // host[0] is A
require.NoError(t, net.Hosts()[2].Connect(ctx, *addrB)) // host[2] is C

// wait on both peer identification events
for i := 0; i < 2; i++ {
select {
case <-sub0.Out():
case <-sub2.Out():
case <-ctx.Done():
assert.FailNow(t, "timeout waiting for peers to connect")
}
}

_, err = serviceA.Subscribe(BadEncoding)
require.NoError(t, err)

subsB, err := serviceB.Subscribe(BadEncoding)
require.NoError(t, err)
defer subsB.Cancel()

subsC, err := serviceC.Subscribe(BadEncoding)
require.NoError(t, err)
defer subsC.Cancel()

befp := CreateBadEncodingProof([]byte("hash"), uint64(h.Height), errByz)
// deregister validator in order to send Fraud Proof
fserviceA.pubsub.UnregisterTopicValidator(getSubTopic(BadEncoding)) //nolint:errcheck
// create a new validator for serviceB
fserviceB.pubsub.UnregisterTopicValidator(getSubTopic(BadEncoding)) //nolint:errcheck
f := func(ctx context.Context, from peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
msg.ValidatorData = befp
return pubsub.ValidationAccept
}
fserviceB.pubsub.RegisterTopicValidator(getSubTopic(BadEncoding), f) //nolint:errcheck
err = fserviceA.Broadcast(ctx, befp, pubsub.WithReadiness(pubsub.MinTopicSize(1)))
require.NoError(t, err)

_, err = subsB.Proof(ctx)
require.NoError(t, err)

newCtx, cancel := context.WithTimeout(ctx, time.Millisecond*500)
t.Cleanup(cancel)
_, err = subsC.Proof(newCtx)
vgonkivs marked this conversation as resolved.
Show resolved Hide resolved
require.Error(t, err)
require.True(t, bl.Contains(net.Hosts()[1].ID()))
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
}

func TestService_GossipingOfFaultBEFP(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer t.Cleanup(cancel)
vgonkivs marked this conversation as resolved.
Show resolved Hide resolved
// create mock network
net, err := mocknet.FullMeshLinked(3)
require.NoError(t, err)
bServ := mdutils.Bserv()

// create first fraud service that will broadcast incorrect Fraud Proof
serviceA, store1 := createServiceWithHost(t, net.Hosts()[0])

h, err := store1.GetByHeight(context.TODO(), 1)
vgonkivs marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, err)

// create and break byzantine error
_, err = generateByzantineError(ctx, t, h, bServ)
require.Error(t, err)
var errByz *ipld.ErrByzantine
require.True(t, errors.As(err, &errByz))
errByz.Index = 2

fserviceA := serviceA.(*service)
require.NotNil(t, fserviceA)

bl, err := pubsub.NewTimeCachedBlacklist(time.Hour)
require.NoError(t, err)
// create pub sub in order to listen for Fraud Proof
psB, err := pubsub.NewGossipSub(ctx, net.Hosts()[1], // -> B
pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign), pubsub.WithBlacklist(bl))
require.NoError(t, err)
// create second service that will receive and validate Fraud Proof
serviceB := NewService(psB, store1.GetByHeight, sync.MutexWrap(datastore.NewMapDatastore()))
fserviceB := serviceB.(*service)
require.NotNil(t, fserviceB)
addrB := host.InfoFromHost(net.Hosts()[1]) // -> B

// create pub sub in order to listen for Fraud Proof
psC, err := pubsub.NewGossipSub(ctx, net.Hosts()[2], // -> C
pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign))
require.NoError(t, err)
serviceC := NewService(psC, store1.GetByHeight, sync.MutexWrap(datastore.NewMapDatastore()))

// perform subscriptions
sub0, err := net.Hosts()[0].EventBus().Subscribe(&event.EvtPeerIdentificationCompleted{})
require.NoError(t, err)
sub2, err := net.Hosts()[2].EventBus().Subscribe(&event.EvtPeerIdentificationCompleted{})
require.NoError(t, err)

// establish connections
// connect peers: A -> B -> C, so A and C are not connected to each other
require.NoError(t, net.Hosts()[0].Connect(ctx, *addrB)) // host[0] is A
require.NoError(t, net.Hosts()[2].Connect(ctx, *addrB)) // host[2] is C

// wait on both peer identification events
for i := 0; i < 2; i++ {
select {
case <-sub0.Out():
case <-sub2.Out():
case <-ctx.Done():
assert.FailNow(t, "timeout waiting for peers to connect")
}
}

// subscribe to BEFP
_, err = serviceA.Subscribe(BadEncoding)
require.NoError(t, err)

subsB, err := serviceB.Subscribe(BadEncoding)
require.NoError(t, err)
defer subsB.Cancel()

subsC, err := serviceC.Subscribe(BadEncoding)
require.NoError(t, err)
defer subsC.Cancel()
Wondertan marked this conversation as resolved.
Show resolved Hide resolved

// deregister validator in order to send Fraud Proof
fserviceA.pubsub.UnregisterTopicValidator(getSubTopic(BadEncoding)) //nolint:errcheck
// Broadcast BEFP
err = fserviceA.Broadcast(ctx, CreateBadEncodingProof([]byte("hash"), uint64(h.Height), errByz),
pubsub.WithReadiness(pubsub.MinTopicSize(1)))
require.NoError(t, err)

newCtx, cancel := context.WithTimeout(ctx, time.Millisecond*100)
t.Cleanup(cancel)
_, err = subsB.Proof(newCtx)
require.Error(t, err)
require.True(t, bl.Contains(net.Hosts()[0].ID()))

proofs, err := serviceC.Get(ctx, BadEncoding)
require.Error(t, err)
require.Nil(t, proofs)
}

func createService(t *testing.T) (Service, *mockStore) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*15)
t.Cleanup(cancel)

// create mock network
net, err := mocknet.FullMeshLinked(2)
net, err := mocknet.FullMeshLinked(1)
require.NoError(t, err)

// create pubsub for host
ps, err := pubsub.NewGossipSub(ctx, net.Hosts()[0],
pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign))
require.NoError(t, err)
store := createStore(t, 10)
return NewService(ps, store.GetByHeight, sync.MutexWrap(datastore.NewMapDatastore())), store
}

func createServiceWithHost(t *testing.T, host host.Host) (Service, *mockStore) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*15)
t.Cleanup(cancel)

// create pubsub for host
ps, err := pubsub.NewGossipSub(ctx, host,
pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign))
require.NoError(t, err)
store := createStore(t, 10)
return NewService(ps, store.GetByHeight, sync.MutexWrap(datastore.NewMapDatastore())), store
}
3 changes: 2 additions & 1 deletion fraud/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"

"github.com/ipfs/go-blockservice"
pubsub "github.com/libp2p/go-libp2p-pubsub"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/ipld"
Expand All @@ -13,7 +14,7 @@ import (
type DummyService struct {
}

func (d *DummyService) Broadcast(context.Context, Proof) error {
func (d *DummyService) Broadcast(context.Context, Proof, ...pubsub.PubOpt) error {
return nil
}

Expand Down