Skip to content

Commit

Permalink
Merge pull request #53 from renaynay/subjective-init
Browse files Browse the repository at this point in the history
feat(p2p)!: Extend Head interface's Head method with ...HeadOption, introduce WithTrustedHead opt
  • Loading branch information
renaynay authored Aug 3, 2023
2 parents 6cf3094 + bb1bed9 commit fe54fd9
Show file tree
Hide file tree
Showing 16 changed files with 353 additions and 60 deletions.
4 changes: 2 additions & 2 deletions headertest/dummy_header.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,11 @@ func (d *DummyHeader) Verify(header header.Header) error {
}

if header.Height() <= d.Height() {
return fmt.Errorf("expected new header Height to be larger than old header Time")
return fmt.Errorf("expected new header Height %d to be larger than old header Height %d", header.Height(), d.Height())
}

if header.Time().Before(d.Time()) {
return fmt.Errorf("expected new header Time to be after old header Time")
return fmt.Errorf("expected new header Time %v to be after old header Time %v", header.Time(), d.Time())
}

return nil
Expand Down
1 change: 1 addition & 0 deletions headertest/dummy_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func (s *DummySuite) NextHeader() *DummyHeader {
}

dh := RandDummyHeader(s.t)
dh.Raw.Time = s.head.Time().Add(time.Nanosecond)
dh.Raw.Height = s.head.Height() + 1
dh.Raw.PreviousHash = s.head.Hash()
_ = dh.rehash()
Expand Down
2 changes: 1 addition & 1 deletion headertest/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (m *Store[H]) Height() uint64 {
return uint64(m.HeadHeight)
}

func (m *Store[H]) Head(context.Context) (H, error) {
func (m *Store[H]) Head(context.Context, ...header.HeadOption) (H, error) {
return m.Headers[m.HeadHeight], nil
}

Expand Down
2 changes: 1 addition & 1 deletion interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,5 +127,5 @@ type Getter[H Header] interface {
// reporting it.
type Head[H Header] interface {
// Head returns the latest known header.
Head(context.Context) (H, error)
Head(context.Context, ...HeadOption) (H, error)
}
2 changes: 1 addition & 1 deletion local/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (l *Exchange[H]) Stop(context.Context) error {
return nil
}

func (l *Exchange[H]) Head(ctx context.Context) (H, error) {
func (l *Exchange[H]) Head(ctx context.Context, _ ...header.HeadOption) (H, error) {
return l.store.Head(ctx)
}

Expand Down
17 changes: 17 additions & 0 deletions opts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package header

type HeadOption func(opts *HeadParams)

// HeadParams contains options to be used for Head interface methods
type HeadParams struct {
// TrustedHead allows the caller of Head to specify a trusted header
// against which the underlying implementation of Head can verify against.
TrustedHead Header
}

// WithTrustedHead sets the TrustedHead parameter to the given header.
func WithTrustedHead(verified Header) func(opts *HeadParams) {
return func(opts *HeadParams) {
opts.TrustedHead = verified
}
}
85 changes: 55 additions & 30 deletions p2p/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package p2p
import (
"bytes"
"context"
"errors"
"fmt"
"math/rand"
"sort"
Expand All @@ -21,10 +20,15 @@ import (

var log = logging.Logger("header/p2p")

// the minimum number of headers of the same height received from trusted peers
// to determine the network head. If all trusted header will return headers with
// non-equal height, then the highest header will be chosen.
const minTrustedHeadResponses = 2
// minHeadResponses is the minimum number of headers of the same height
// received from peers to determine the network head. If all trusted peers
// will return headers with non-equal height, then the highest header will be
// chosen.
const minHeadResponses = 2

// maxUntrustedHeadRequests is the number of head requests to be made to
// the network in order to determine the network head.
var maxUntrustedHeadRequests = 4

// Exchange enables sending outbound HeaderRequests to the network as well as
// handling inbound HeaderRequests from the network.
Expand Down Expand Up @@ -72,26 +76,16 @@ func NewExchange[H header.Header](
return ex, nil
}

func (ex *Exchange[H]) Start(context.Context) error {
func (ex *Exchange[H]) Start(ctx context.Context) error {
ex.ctx, ex.cancel = context.WithCancel(context.Background())
log.Infow("client: starting client", "protocol ID", ex.protocolID)

trustedPeers := ex.trustedPeers()

for _, p := range trustedPeers {
// Try to pre-connect to trusted peers.
// We don't really care if we succeed at this point
// and just need any peers in the peerTracker asap
go func(p peer.ID) {
err := ex.host.Connect(ex.ctx, peer.AddrInfo{ID: p})
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
log.Debugw("err connecting to a bootstrap peer", "err", err, "peer", p)
}
}(p)
}
go ex.peerTracker.gc()
go ex.peerTracker.track()
return nil

// bootstrap the peerTracker with trusted peers as well as previously seen
// peers if provided.
return ex.peerTracker.bootstrap(ctx, ex.trustedPeers())
}

func (ex *Exchange[H]) Stop(ctx context.Context) error {
Expand All @@ -106,7 +100,7 @@ func (ex *Exchange[H]) Stop(ctx context.Context) error {
// The Head must be verified thereafter where possible.
// We request in parallel all the trusted peers, compare their response
// and return the highest one.
func (ex *Exchange[H]) Head(ctx context.Context) (H, error) {
func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption) (H, error) {
log.Debug("requesting head")

reqCtx := ctx
Expand All @@ -121,30 +115,61 @@ func (ex *Exchange[H]) Head(ctx context.Context) (H, error) {
defer cancel()
}

reqParams := header.HeadParams{}
for _, opt := range opts {
opt(&reqParams)
}

peers := ex.trustedPeers()

// the TrustedHead field indicates whether the Exchange should use
// trusted peers for its Head request. If nil, trusted peers will
// be used. If non-nil, Exchange will ask several peers from its network for
// their Head and verify against the given trusted header.
useTrackedPeers := reqParams.TrustedHead != nil
if useTrackedPeers {
trackedPeers := ex.peerTracker.getPeers(maxUntrustedHeadRequests)
if len(trackedPeers) > 0 {
peers = trackedPeers
log.Debugw("requesting head from tracked peers", "amount", len(peers))
}
}

var (
zero H
trustedPeers = ex.trustedPeers()
headerRespCh = make(chan H, len(trustedPeers))
headerReq = &p2p_pb.HeaderRequest{
zero H
headerReq = &p2p_pb.HeaderRequest{
Data: &p2p_pb.HeaderRequest_Origin{Origin: uint64(0)},
Amount: 1,
}
headerRespCh = make(chan H, len(peers))
)
for _, from := range trustedPeers {
for _, from := range peers {
go func(from peer.ID) {
headers, err := ex.request(reqCtx, from, headerReq)
if err != nil {
log.Errorw("head request to trusted peer failed", "trustedPeer", from, "err", err)
log.Errorw("head request to peer failed", "peer", from, "err", err)
headerRespCh <- zero
return
}
// if tracked (untrusted) peers were requested, verify head
if useTrackedPeers {
err = reqParams.TrustedHead.Verify(headers[0])
if err != nil {
log.Errorw("verifying head received from tracked peer", "tracked peer", from,
"err", err)
// bad head was given, block peer
ex.peerTracker.blockPeer(from, fmt.Errorf("returned bad head: %w", err))
headerRespCh <- zero
return
}
}
// request ensures that the result slice will have at least one Header
headerRespCh <- headers[0]
}(from)
}

headers := make([]H, 0, len(trustedPeers))
for range trustedPeers {
headers := make([]H, 0, len(peers))
for range peers {
select {
case h := <-headerRespCh:
if !h.IsZero() {
Expand Down Expand Up @@ -346,7 +371,7 @@ func bestHead[H header.Header](result []H) (H, error) {

// try to find Header with the maximum height that was received at least from 2 peers
for _, res := range result {
if counter[res.Hash().String()] >= minTrustedHeadResponses {
if counter[res.Hash().String()] >= minHeadResponses {
return res, nil
}
}
Expand Down
67 changes: 59 additions & 8 deletions p2p/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package p2p

import (
"context"
"strconv"
"testing"
"time"

Expand All @@ -18,23 +19,73 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/celestiaorg/go-libp2p-messenger/serde"

"github.com/celestiaorg/go-header"
"github.com/celestiaorg/go-header/headertest"
p2p_pb "github.com/celestiaorg/go-header/p2p/pb"
"github.com/celestiaorg/go-libp2p-messenger/serde"
)

const networkID = "private"

func TestExchange_RequestHead(t *testing.T) {
hosts := createMocknet(t, 2)
exchg, store := createP2PExAndServer(t, hosts[0], hosts[1])
// perform header request
header, err := exchg.Head(context.Background())
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

hosts := createMocknet(t, 3)
exchg, trustedStore := createP2PExAndServer(t, hosts[0], hosts[1])

// create new server-side exchange that will act as the tracked peer
// it will have a higher chain head than the trusted peer so that the
// test can determine which peer was asked
trackedStore := headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 50)
serverSideEx, err := NewExchangeServer[*headertest.DummyHeader](hosts[2], trackedStore,
WithNetworkID[ServerParameters](networkID),
)
require.NoError(t, err)
err = serverSideEx.Start(ctx)
require.NoError(t, err)
t.Cleanup(func() {
err = serverSideEx.Stop(ctx)
require.NoError(t, err)
})

assert.Equal(t, store.Headers[store.HeadHeight].Height(), header.Height())
assert.Equal(t, store.Headers[store.HeadHeight].Hash(), header.Hash())
tests := []struct {
requestFromTrusted bool
lastHeader header.Header
expectedHeight int64
expectedHash header.Hash
}{
// routes to trusted peer only
{
requestFromTrusted: true,
lastHeader: trustedStore.Headers[trustedStore.HeadHeight-1],
expectedHeight: trustedStore.HeadHeight,
expectedHash: trustedStore.Headers[trustedStore.HeadHeight].Hash(),
},
// routes to tracked peers and takes highest chain head
{
requestFromTrusted: false,
lastHeader: trackedStore.Headers[trackedStore.HeadHeight-1],
expectedHeight: trackedStore.HeadHeight,
expectedHash: trackedStore.Headers[trackedStore.HeadHeight].Hash(),
},
}

for i, tt := range tests {
t.Run(strconv.Itoa(i), func(t *testing.T) {
var opts []header.HeadOption
if !tt.requestFromTrusted {
opts = append(opts, header.WithTrustedHead(tt.lastHeader))
}

header, err := exchg.Head(ctx, opts...)
require.NoError(t, err)

assert.Equal(t, tt.expectedHeight, header.Height())
assert.Equal(t, tt.expectedHash, header.Hash())
})
}
}

func TestExchange_RequestHead_UnresponsivePeer(t *testing.T) {
Expand Down Expand Up @@ -532,7 +583,7 @@ func (t *timedOutStore) HasAt(_ context.Context, _ uint64) bool {
return true
}

func (t *timedOutStore) Head(_ context.Context) (*headertest.DummyHeader, error) {
func (t *timedOutStore) Head(context.Context, ...header.HeadOption) (*headertest.DummyHeader, error) {
time.Sleep(t.timeout)
return nil, header.ErrNoHead
}
Loading

0 comments on commit fe54fd9

Please sign in to comment.