Skip to content
This repository has been archived by the owner on Apr 15, 2024. It is now read-only.

Commit

Permalink
feat: add qgb dht wait for peers (#141)
Browse files Browse the repository at this point in the history
* feat: add qgb dht wait for peers

* fix: close dhts in test
  • Loading branch information
rach-id authored Feb 16, 2023
1 parent 0b1f39b commit 7f8bb72
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 3 deletions.
42 changes: 40 additions & 2 deletions p2p/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"time"

tmlog "github.com/tendermint/tendermint/libs/log"

"github.com/celestiaorg/orchestrator-relayer/types"

ds "github.com/ipfs/go-datastore"
Expand All @@ -21,10 +23,11 @@ const (
// Used to add helper methods to easily handle the DHT.
type QgbDHT struct {
*dht.IpfsDHT
logger tmlog.Logger
}

// NewQgbDHT create a new IPFS DHT using a suitable configuration for the QGB.
func NewQgbDHT(ctx context.Context, h host.Host, store ds.Batching) (*QgbDHT, error) {
func NewQgbDHT(ctx context.Context, h host.Host, store ds.Batching, logger tmlog.Logger) (*QgbDHT, error) {
router, err := dht.New(
ctx,
h,
Expand All @@ -39,7 +42,42 @@ func NewQgbDHT(ctx context.Context, h host.Host, store ds.Batching) (*QgbDHT, er
return nil, err
}

return &QgbDHT{router}, nil
return &QgbDHT{
IpfsDHT: router,
logger: logger,
}, nil
}

// WaitForPeers waits for peers to be connected to the DHT.
// Returns nil if the context is done or the peers list has more peers than the specified peersThreshold.
// Returns error if it times out.
func (q QgbDHT) WaitForPeers(ctx context.Context, timeout time.Duration, rate time.Duration, peersThreshold int) error {
if peersThreshold < 1 {
return ErrPeersThresholdCannotBeNegative
}

t := time.After(timeout)
ticker := time.NewTicker(rate)
for {
select {
case <-ctx.Done():
return nil
case <-t:
return ErrPeersTimeout
case <-ticker.C:
peersLen := len(q.RoutingTable().ListPeers())
if peersLen >= peersThreshold {
return nil
}
q.logger.Info(
"waiting for routing table to populate",
"target number of peers",
peersThreshold,
"current number",
peersLen,
)
}
}
}

// Note: The Get and Put methods do not run any validations on the data commitment confirms
Expand Down
30 changes: 30 additions & 0 deletions p2p/dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ package p2p_test
import (
"context"
"testing"
"time"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/require"

"github.com/celestiaorg/orchestrator-relayer/p2p"
qgbtesting "github.com/celestiaorg/orchestrator-relayer/testing"
Expand Down Expand Up @@ -137,3 +141,29 @@ func TestNetworkGetNonExistentValsetConfirm(t *testing.T) {
assert.Error(t, err)
assert.True(t, types.IsEmptyValsetConfirm(actualConfirm))
}

func TestWaitForPeers(t *testing.T) {
ctx := context.Background()
// create first dht
h1, _, dht1 := qgbtesting.NewTestDHT(ctx)
defer dht1.Close()

// wait for peers
err := dht1.WaitForPeers(ctx, 10*time.Millisecond, time.Millisecond, 1)
// should error because no peer is connected to this dht
assert.Error(t, err)

// create second dht
h2, _, dht2 := qgbtesting.NewTestDHT(ctx)
defer dht2.Close()
// connect to first dht
err = h2.Connect(ctx, peer.AddrInfo{
ID: h1.ID(),
Addrs: h1.Addrs(),
})
require.NoError(t, err)

// wait for peers
err = dht1.WaitForPeers(ctx, 10*time.Millisecond, time.Millisecond, 1)
assert.NoError(t, err)
}
2 changes: 2 additions & 0 deletions p2p/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package p2p
import "errors"

var (
ErrPeersTimeout = errors.New("timeout while waiting for peers")
ErrPeersThresholdCannotBeNegative = errors.New("peers threshold cannot be negative")
ErrNilPrivateKey = errors.New("private key cannot be nil")
ErrNotEnoughValsetConfirms = errors.New("couldn't find enough valset confirms")
ErrNotEnoughDataCommitmentConfirms = errors.New("couldn't find enough data commitment confirms")
Expand Down
4 changes: 3 additions & 1 deletion testing/dht_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"time"

tmlog "github.com/tendermint/tendermint/libs/log"

"github.com/celestiaorg/orchestrator-relayer/p2p"
ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
Expand Down Expand Up @@ -68,7 +70,7 @@ func NewTestDHT(ctx context.Context) (host.Host, ds.Batching, *p2p.QgbDHT) {
panic(err)
}
dataStore := dssync.MutexWrap(ds.NewMapDatastore())
dht, err := p2p.NewQgbDHT(ctx, h, dataStore)
dht, err := p2p.NewQgbDHT(ctx, h, dataStore, tmlog.NewNopLogger())
if err != nil {
panic(err)
}
Expand Down

0 comments on commit 7f8bb72

Please sign in to comment.