Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

More stats, knobs and tunings #514

Merged
merged 29 commits into from
Aug 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
3dbe5b8
feat: expose number of outstanding requests in stats
aschmahmann Jun 30, 2021
c557ca3
bump task workers
aschmahmann Jun 30, 2021
36a9802
expose more task worker options
aschmahmann Jul 1, 2021
ebdf046
feat: tighter send timeouts
Stebalien Jul 1, 2021
f2004d8
add option for maximum outstanding bytes per peer
aschmahmann Jul 1, 2021
5f43c5d
add prometheus metric for how long it takes to send messages
aschmahmann Jul 1, 2021
b82af74
make tests compile; make some tests pass
petar Jul 19, 2021
b0f9dfb
add defaults for knobs; all tests pass
petar Jul 20, 2021
68aca5c
bump go-peertaskqueue dependency
petar Jul 20, 2021
8a5dbcb
remove unused code
petar Jul 22, 2021
e4784ab
go mod tidy
petar Jul 22, 2021
86056b7
increase test worker count
petar Jul 22, 2021
d7a22eb
tune test knobs
petar Jul 22, 2021
0bb4888
test knobs
petar Jul 22, 2021
6ff26a9
test knobs
petar Jul 22, 2021
66c2800
bump go-peertaskqueue to v0.3.0
petar Jul 22, 2021
8035dfd
go mod tidy
petar Jul 22, 2021
7606648
add prometheus metrics
petar Jul 26, 2021
1eeb20c
add gauges for blockstore tasks
petar Aug 3, 2021
32a8c10
address comments
petar Aug 5, 2021
890a3f0
adjustments
petar Aug 5, 2021
7efcb76
go mod tidy
petar Aug 5, 2021
d969152
add defaults internal package
petar Aug 6, 2021
8b55e4a
remove jobs from stats structure
petar Aug 6, 2021
f982980
add constructors for testing
petar Aug 6, 2021
c531e67
fix race
petar Aug 6, 2021
8ebe2a0
move testing constructors
petar Aug 6, 2021
8a8d865
remove extraneous imports
aschmahmann Aug 18, 2021
d4a71a1
bump peertaskqueue
aschmahmann Aug 18, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 95 additions & 33 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
deciface "github.com/ipfs/go-bitswap/decision"
bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager"
"github.com/ipfs/go-bitswap/internal/decision"
"github.com/ipfs/go-bitswap/internal/defaults"
bsgetter "github.com/ipfs/go-bitswap/internal/getter"
bsmq "github.com/ipfs/go-bitswap/internal/messagequeue"
"github.com/ipfs/go-bitswap/internal/notifications"
Expand Down Expand Up @@ -42,15 +43,6 @@ var sflog = log.Desugar()

var _ exchange.SessionExchange = (*Bitswap)(nil)

const (
// these requests take at _least_ two minutes at the moment.
provideTimeout = time.Minute * 3
defaultProvSearchDelay = time.Second

// Number of concurrent workers in decision engine that process requests to the blockstore
defaulEngineBlockstoreWorkerCount = 128
)

var (
// HasBlockBufferSize is the buffer size of the channel for new blocks
// that need to be provided. They should get pulled over by the
Expand All @@ -62,6 +54,8 @@ var (

// the 1<<18+15 is to observe old file chunks that are 1<<18 + 14 in size
metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22}

timeMetricsBuckets = []float64{1, 10, 30, 60, 90, 120, 600}
)

// Option defines the functional option type that can be used to configure
Expand Down Expand Up @@ -100,6 +94,36 @@ func EngineBlockstoreWorkerCount(count int) Option {
}
}

// EngineTaskWorkerCount sets the number of worker threads used inside the engine
func EngineTaskWorkerCount(count int) Option {
if count <= 0 {
panic(fmt.Sprintf("Engine task worker count is %d but must be > 0", count))
}
return func(bs *Bitswap) {
bs.engineTaskWorkerCount = count
}
}

func TaskWorkerCount(count int) Option {
if count <= 0 {
panic(fmt.Sprintf("task worker count is %d but must be > 0", count))
}
return func(bs *Bitswap) {
bs.taskWorkerCount = count
}
}

// MaxOutstandingBytesPerPeer describes approximately how much work we are will to have outstanding to a peer at any
// given time. Setting it to 0 will disable any limiting.
func MaxOutstandingBytesPerPeer(count int) Option {
if count < 0 {
panic(fmt.Sprintf("max outstanding bytes per peer is %d but must be >= 0", count))
}
return func(bs *Bitswap) {
bs.engineMaxOutstandingBytesPerPeer = count
}
}

// SetSendDontHaves indicates what to do when the engine receives a want-block
// for a block that is not in the blockstore. Either
// - Send a DONT_HAVE message
Expand Down Expand Up @@ -147,6 +171,17 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
sentHistogram := metrics.NewCtx(ctx, "sent_all_blocks_bytes", "Histogram of blocks sent by"+
" this bitswap").Histogram(metricsBuckets)

sendTimeHistogram := metrics.NewCtx(ctx, "send_times", "Histogram of how long it takes to send messages"+
" in this bitswap").Histogram(timeMetricsBuckets)

pendingEngineGauge := metrics.NewCtx(ctx, "pending_tasks", "Total number of pending tasks").Gauge()

activeEngineGauge := metrics.NewCtx(ctx, "active_tasks", "Total number of active tasks").Gauge()

pendingBlocksGauge := metrics.NewCtx(ctx, "pending_block_tasks", "Total number of pending blockstore tasks").Gauge()

activeBlocksGauge := metrics.NewCtx(ctx, "active_block_tasks", "Total number of active blockstore tasks").Gauge()

px := process.WithTeardown(func() error {
return nil
})
Expand Down Expand Up @@ -192,26 +227,30 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
sm = bssm.New(ctx, sessionFactory, sim, sessionPeerManagerFactory, bpm, pm, notif, network.Self())

bs = &Bitswap{
blockstore: bstore,
network: network,
process: px,
newBlocks: make(chan cid.Cid, HasBlockBufferSize),
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
pm: pm,
pqm: pqm,
sm: sm,
sim: sim,
notif: notif,
counters: new(counters),
dupMetric: dupHist,
allMetric: allHist,
sentHistogram: sentHistogram,
provideEnabled: true,
provSearchDelay: defaultProvSearchDelay,
rebroadcastDelay: delay.Fixed(time.Minute),
engineBstoreWorkerCount: defaulEngineBlockstoreWorkerCount,
engineSetSendDontHaves: true,
simulateDontHavesOnTimeout: true,
blockstore: bstore,
network: network,
process: px,
newBlocks: make(chan cid.Cid, HasBlockBufferSize),
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
pm: pm,
pqm: pqm,
sm: sm,
sim: sim,
notif: notif,
counters: new(counters),
dupMetric: dupHist,
allMetric: allHist,
sentHistogram: sentHistogram,
sendTimeHistogram: sendTimeHistogram,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still useful? If it's not documented then it's not useful and we should remove it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is useful to know if your outbound connection is slow. Let's keep it. Where is the right place to document it? The metric itself is documented when initialized, so the doc shows up on the grafana dashboard.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, that seems fine. We should try and track this internally on our infra (and ask Pinata to try on theirs) so we can get an understanding of what these numbers look like and if the time resolution is anywhere close to correct.

provideEnabled: true,
provSearchDelay: defaults.ProvSearchDelay,
rebroadcastDelay: delay.Fixed(time.Minute),
engineBstoreWorkerCount: defaults.BitswapEngineBlockstoreWorkerCount,
engineTaskWorkerCount: defaults.BitswapEngineTaskWorkerCount,
taskWorkerCount: defaults.BitswapTaskWorkerCount,
engineMaxOutstandingBytesPerPeer: defaults.BitswapMaxOutstandingBytesPerPeer,
engineSetSendDontHaves: true,
simulateDontHavesOnTimeout: true,
}

// apply functional options before starting and running bitswap
Expand All @@ -220,7 +259,20 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
}

// Set up decision engine
bs.engine = decision.NewEngine(bstore, bs.engineBstoreWorkerCount, network.ConnectionManager(), network.Self(), bs.engineScoreLedger)
bs.engine = decision.NewEngine(
ctx,
bstore,
bs.engineBstoreWorkerCount,
bs.engineTaskWorkerCount,
bs.engineMaxOutstandingBytesPerPeer,
network.ConnectionManager(),
network.Self(),
bs.engineScoreLedger,
pendingEngineGauge,
activeEngineGauge,
pendingBlocksGauge,
activeBlocksGauge,
)
bs.engine.SetSendDontHaves(bs.engineSetSendDontHaves)

bs.pqm.Startup()
Expand Down Expand Up @@ -277,9 +329,10 @@ type Bitswap struct {
counters *counters

// Metrics interface metrics
dupMetric metrics.Histogram
allMetric metrics.Histogram
sentHistogram metrics.Histogram
dupMetric metrics.Histogram
allMetric metrics.Histogram
sentHistogram metrics.Histogram
sendTimeHistogram metrics.Histogram

// External statistics interface
wiretap WireTap
Expand All @@ -303,6 +356,15 @@ type Bitswap struct {
// how many worker threads to start for decision engine blockstore worker
engineBstoreWorkerCount int

// how many worker threads to start for decision engine task worker
engineTaskWorkerCount int

// the total number of simultaneous threads sending outgoing messages
taskWorkerCount int

// the total amount of bytes that a peer should have outstanding, it is utilized by the decision engine
engineMaxOutstandingBytesPerPeer int

// the score ledger used by the decision engine
engineScoreLedger deciface.ScoreLedger

Expand Down
6 changes: 5 additions & 1 deletion bitswap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,11 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
t.SkipNow()
}
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
ig := testinstance.NewTestInstanceGenerator(net, nil, []bitswap.Option{
bitswap.TaskWorkerCount(5),
bitswap.EngineTaskWorkerCount(5),
bitswap.MaxOutstandingBytesPerPeer(1 << 20),
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
})
defer ig.Close()
bg := blocksutil.NewBlockGenerator()

Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require (
github.com/ipfs/go-ipfs-util v0.0.2
github.com/ipfs/go-log v1.0.5
github.com/ipfs/go-metrics-interface v0.0.1
github.com/ipfs/go-peertaskqueue v0.2.0
github.com/ipfs/go-peertaskqueue v0.4.0
github.com/jbenet/goprocess v0.1.4
github.com/libp2p/go-buffer-pool v0.0.2
github.com/libp2p/go-libp2p v0.14.3
Expand All @@ -28,6 +28,7 @@ require (
github.com/libp2p/go-msgio v0.0.6
github.com/multiformats/go-multiaddr v0.3.3
github.com/multiformats/go-multistream v0.2.2
github.com/stretchr/testify v1.7.0
go.uber.org/zap v1.16.0
)

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,8 @@ github.com/ipfs/go-log/v2 v2.1.3 h1:1iS3IU7aXRlbgUpN8yTTpJ53NXYjAe37vcI5+5nYrzk=
github.com/ipfs/go-log/v2 v2.1.3/go.mod h1:/8d0SH3Su5Ooc31QlL1WysJhvyOTDCjcCZ9Axpmri6g=
github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fGD6n0jO4kdg=
github.com/ipfs/go-metrics-interface v0.0.1/go.mod h1:6s6euYU4zowdslK0GKHmqaIZ3j/b/tL7HTWtJ4VPgWY=
github.com/ipfs/go-peertaskqueue v0.2.0 h1:2cSr7exUGKYyDeUyQ7P/nHPs9P7Ht/B+ROrpN1EJOjc=
github.com/ipfs/go-peertaskqueue v0.2.0/go.mod h1:5/eNrBEbtSKWCG+kQK8K8fGNixoYUnr+P7jivavs9lY=
github.com/ipfs/go-peertaskqueue v0.4.0 h1:x1hFgA4JOUJ3ntPfqLRu6v4k6kKL0p07r3RSg9JNyHI=
github.com/ipfs/go-peertaskqueue v0.4.0/go.mod h1:KL9F49hXJMoXCad8e5anivjN+kWdr+CyGcyh4K6doLc=
github.com/jackpal/gateway v1.0.5/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA=
github.com/jackpal/go-nat-pmp v1.0.1/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc=
github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus=
Expand Down
33 changes: 24 additions & 9 deletions internal/decision/blockstoremanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,36 @@ import (
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
bstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/ipfs/go-metrics-interface"
process "github.com/jbenet/goprocess"
)

// blockstoreManager maintains a pool of workers that make requests to the blockstore.
type blockstoreManager struct {
bs bstore.Blockstore
workerCount int
jobs chan func()
px process.Process
bs bstore.Blockstore
workerCount int
jobs chan func()
px process.Process
pendingGauge metrics.Gauge
activeGauge metrics.Gauge
}

// newBlockstoreManager creates a new blockstoreManager with the given context
// and number of workers
func newBlockstoreManager(bs bstore.Blockstore, workerCount int) *blockstoreManager {
func newBlockstoreManager(
ctx context.Context,
bs bstore.Blockstore,
workerCount int,
pendingGauge metrics.Gauge,
activeGauge metrics.Gauge,
) *blockstoreManager {
return &blockstoreManager{
bs: bs,
workerCount: workerCount,
jobs: make(chan func()),
px: process.WithTeardown(func() error { return nil }),
bs: bs,
workerCount: workerCount,
jobs: make(chan func()),
px: process.WithTeardown(func() error { return nil }),
pendingGauge: pendingGauge,
activeGauge: activeGauge,
}
}

Expand All @@ -46,7 +57,10 @@ func (bsm *blockstoreManager) worker(px process.Process) {
case <-px.Closing():
return
case job := <-bsm.jobs:
bsm.pendingGauge.Dec()
bsm.activeGauge.Inc()
job()
bsm.activeGauge.Dec()
}
}
}
Expand All @@ -58,6 +72,7 @@ func (bsm *blockstoreManager) addJob(ctx context.Context, job func()) error {
case <-bsm.px.Closing():
return fmt.Errorf("shutting down")
case bsm.jobs <- job:
bsm.pendingGauge.Inc()
return nil
}
}
Expand Down
22 changes: 17 additions & 5 deletions internal/decision/blockstoremanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/ipfs/go-bitswap/internal/testutil"
cid "github.com/ipfs/go-cid"
"github.com/ipfs/go-metrics-interface"

blocks "github.com/ipfs/go-block-format"
ds "github.com/ipfs/go-datastore"
Expand All @@ -19,13 +20,23 @@ import (
process "github.com/jbenet/goprocess"
)

func newBlockstoreManagerForTesting(
ctx context.Context,
bs blockstore.Blockstore,
workerCount int,
) *blockstoreManager {
testPendingBlocksGauge := metrics.NewCtx(ctx, "pending_block_tasks", "Total number of pending blockstore tasks").Gauge()
testActiveBlocksGauge := metrics.NewCtx(ctx, "active_block_tasks", "Total number of active blockstore tasks").Gauge()
return newBlockstoreManager(ctx, bs, workerCount, testPendingBlocksGauge, testActiveBlocksGauge)
}

func TestBlockstoreManagerNotFoundKey(t *testing.T) {
ctx := context.Background()
bsdelay := delay.Fixed(3 * time.Millisecond)
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))

bsm := newBlockstoreManager(bstore, 5)
bsm := newBlockstoreManagerForTesting(ctx, bstore, 5)
bsm.start(process.WithTeardown(func() error { return nil }))

cids := testutil.GenerateCids(4)
Expand Down Expand Up @@ -64,7 +75,7 @@ func TestBlockstoreManager(t *testing.T) {
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))

bsm := newBlockstoreManager(bstore, 5)
bsm := newBlockstoreManagerForTesting(ctx, bstore, 5)
bsm.start(process.WithTeardown(func() error { return nil }))

exp := make(map[cid.Cid]blocks.Block)
Expand Down Expand Up @@ -148,7 +159,7 @@ func TestBlockstoreManagerConcurrency(t *testing.T) {
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))

workerCount := 5
bsm := newBlockstoreManager(bstore, workerCount)
bsm := newBlockstoreManagerForTesting(ctx, bstore, workerCount)
bsm.start(process.WithTeardown(func() error { return nil }))

blkSize := int64(8 * 1024)
Expand Down Expand Up @@ -190,7 +201,7 @@ func TestBlockstoreManagerClose(t *testing.T) {
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))

bsm := newBlockstoreManager(bstore, 3)
bsm := newBlockstoreManagerForTesting(ctx, bstore, 3)
px := process.WithTeardown(func() error { return nil })
bsm.start(px)

Expand Down Expand Up @@ -229,7 +240,8 @@ func TestBlockstoreManagerCtxDone(t *testing.T) {
underlyingBstore := blockstore.NewBlockstore(underlyingDstore)
bstore := blockstore.NewBlockstore(dstore)

bsm := newBlockstoreManager(bstore, 3)
ctx := context.Background()
bsm := newBlockstoreManagerForTesting(ctx, bstore, 3)
proc := process.WithTeardown(func() error { return nil })
bsm.start(proc)

Expand Down
Loading