Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: add metrics to track snapshot queue size #100942

Merged
merged 1 commit into from
Apr 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,18 @@ var (
Measurement: "Snapshots",
Unit: metric.Unit_COUNT,
}
metaRangeSnapshotSendQueueSize = metric.Metadata{
Name: "range.snapshots.send-queue-bytes",
Help: "Total size of all snapshots in the snapshot send queue",
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaRangeSnapshotRecvQueueSize = metric.Metadata{
Name: "range.snapshots.recv-queue-bytes",
Help: "Total size of all snapshots in the snapshot receive queue",
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}

metaRangeRaftLeaderTransfers = metric.Metadata{
Name: "range.raftleadertransfers",
Expand Down Expand Up @@ -2046,6 +2058,8 @@ type StoreMetrics struct {
RangeSnapshotRecvInProgress *metric.Gauge
RangeSnapshotSendTotalInProgress *metric.Gauge
RangeSnapshotRecvTotalInProgress *metric.Gauge
RangeSnapshotSendQueueSize *metric.Gauge
RangeSnapshotRecvQueueSize *metric.Gauge

// Delegate snapshot metrics. These don't count self-delegated snapshots.
DelegateSnapshotSendBytes *metric.Counter
Expand Down Expand Up @@ -2645,6 +2659,8 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
RangeSnapshotRecvInProgress: metric.NewGauge(metaRangeSnapshotRecvInProgress),
RangeSnapshotSendTotalInProgress: metric.NewGauge(metaRangeSnapshotSendTotalInProgress),
RangeSnapshotRecvTotalInProgress: metric.NewGauge(metaRangeSnapshotRecvTotalInProgress),
RangeSnapshotSendQueueSize: metric.NewGauge(metaRangeSnapshotSendQueueSize),
RangeSnapshotRecvQueueSize: metric.NewGauge(metaRangeSnapshotRecvQueueSize),
RangeRaftLeaderTransfers: metric.NewCounter(metaRangeRaftLeaderTransfers),
RangeLossOfQuorumRecoveries: metric.NewCounter(metaRangeLossOfQuorumRecoveries),
DelegateSnapshotSendBytes: metric.NewCounter(metaDelegateSnapshotSendBytes),
Expand Down
53 changes: 39 additions & 14 deletions pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ var snapshotPrioritizationEnabled = settings.RegisterBoolSetting(
true,
)

// snapshotMetrics contains metrics on the number and size of snapshots in
// progress or in the snapshot queue.
type snapshotMetrics struct {
QueueLen *metric.Gauge
QueueSize *metric.Gauge
InProgress *metric.Gauge
TotalInProgress *metric.Gauge
}

// incomingSnapshotStream is the minimal interface on a GRPC stream required
// to receive a snapshot over the network.
type incomingSnapshotStream interface {
Expand Down Expand Up @@ -678,13 +687,19 @@ func (s *Store) reserveReceiveSnapshot(
ctx, sp := tracing.EnsureChildSpan(ctx, s.cfg.Tracer(), "reserveReceiveSnapshot")
defer sp.Finish()

return s.throttleSnapshot(ctx, s.snapshotApplyQueue,
int(header.SenderQueueName), header.SenderQueuePriority,
return s.throttleSnapshot(ctx,
s.snapshotApplyQueue,
int(header.SenderQueueName),
header.SenderQueuePriority,
-1,
header.RangeSize,
header.RaftMessageRequest.RangeID,
s.metrics.RangeSnapshotRecvQueueLength,
s.metrics.RangeSnapshotRecvInProgress, s.metrics.RangeSnapshotRecvTotalInProgress,
snapshotMetrics{
s.metrics.RangeSnapshotRecvQueueLength,
s.metrics.RangeSnapshotRecvQueueSize,
s.metrics.RangeSnapshotRecvInProgress,
s.metrics.RangeSnapshotRecvTotalInProgress,
},
)
}

Expand All @@ -698,14 +713,19 @@ func (s *Store) reserveSendSnapshot(
fn()
}

return s.throttleSnapshot(ctx, s.snapshotSendQueue,
return s.throttleSnapshot(ctx,
s.snapshotSendQueue,
int(req.SenderQueueName),
req.SenderQueuePriority,
req.QueueOnDelegateLen,
rangeSize,
req.RangeID,
s.metrics.RangeSnapshotSendQueueLength,
s.metrics.RangeSnapshotSendInProgress, s.metrics.RangeSnapshotSendTotalInProgress,
snapshotMetrics{
s.metrics.RangeSnapshotSendQueueLength,
s.metrics.RangeSnapshotSendQueueSize,
s.metrics.RangeSnapshotSendInProgress,
s.metrics.RangeSnapshotSendTotalInProgress,
},
)
}

Expand All @@ -720,7 +740,7 @@ func (s *Store) throttleSnapshot(
maxQueueLength int64,
rangeSize int64,
rangeID roachpb.RangeID,
waitingSnapshotMetric, inProgressSnapshotMetric, totalInProgressSnapshotMetric *metric.Gauge,
snapshotMetrics snapshotMetrics,
) (cleanup func(), funcErr error) {

tBegin := timeutil.Now()
Expand All @@ -742,8 +762,13 @@ func (s *Store) throttleSnapshot(
}
}()

waitingSnapshotMetric.Inc(1)
defer waitingSnapshotMetric.Dec(1)
// Total bytes of snapshots waiting in the snapshot queue
snapshotMetrics.QueueSize.Inc(rangeSize)
defer snapshotMetrics.QueueSize.Dec(rangeSize)
// Total number of snapshots waiting in the snapshot queue
snapshotMetrics.QueueLen.Inc(1)
defer snapshotMetrics.QueueLen.Dec(1)

queueCtx := ctx
if deadline, ok := queueCtx.Deadline(); ok {
// Enforce a more strict timeout for acquiring the snapshot reservation to
Expand Down Expand Up @@ -778,10 +803,10 @@ func (s *Store) throttleSnapshot(
}

// Counts non-empty in-progress snapshots.
inProgressSnapshotMetric.Inc(1)
snapshotMetrics.InProgress.Inc(1)
}
// Counts all in-progress snapshots.
totalInProgressSnapshotMetric.Inc(1)
snapshotMetrics.TotalInProgress.Inc(1)

// The choice here is essentially arbitrary, but with a default range size of 128mb-512mb and the
// Raft snapshot rate limiting of 32mb/s, we expect to spend less than 16s per snapshot.
Expand All @@ -804,10 +829,10 @@ func (s *Store) throttleSnapshot(
return func() {
s.metrics.ReservedReplicaCount.Dec(1)
s.metrics.Reserved.Dec(rangeSize)
totalInProgressSnapshotMetric.Dec(1)
snapshotMetrics.TotalInProgress.Dec(1)

if rangeSize != 0 || s.cfg.TestingKnobs.ThrottleEmptySnapshots {
inProgressSnapshotMetric.Dec(1)
snapshotMetrics.InProgress.Dec(1)
snapshotQueue.Release(permit)
}
}, nil
Expand Down
14 changes: 12 additions & 2 deletions pkg/kv/kvserver/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3157,7 +3157,7 @@ func TestReserveSnapshotThrottling(t *testing.T) {
s := tc.store

cleanupNonEmpty1, err := s.reserveReceiveSnapshot(ctx, &kvserverpb.SnapshotRequest_Header{
RangeSize: 1,
RangeSize: 10,
})
if err != nil {
t.Fatal(err)
Expand All @@ -3167,6 +3167,8 @@ func TestReserveSnapshotThrottling(t *testing.T) {
}
require.Equal(t, int64(0), s.Metrics().RangeSnapshotRecvQueueLength.Value(),
"unexpected snapshot queue length")
require.Equal(t, int64(0), s.Metrics().RangeSnapshotRecvQueueSize.Value(),
"unexpected snapshot queue size")
require.Equal(t, int64(1), s.Metrics().RangeSnapshotRecvInProgress.Value(),
"unexpected snapshots in progress")
require.Equal(t, int64(1), s.Metrics().RangeSnapshotRecvTotalInProgress.Value(),
Expand All @@ -3184,6 +3186,8 @@ func TestReserveSnapshotThrottling(t *testing.T) {
}
require.Equal(t, int64(0), s.Metrics().RangeSnapshotRecvQueueLength.Value(),
"unexpected snapshot queue length")
require.Equal(t, int64(0), s.Metrics().RangeSnapshotRecvQueueSize.Value(),
"unexpected snapshot queue size")
require.Equal(t, int64(1), s.Metrics().RangeSnapshotRecvInProgress.Value(),
"unexpected snapshots in progress")
require.Equal(t, int64(2), s.Metrics().RangeSnapshotRecvTotalInProgress.Value(),
Expand All @@ -3205,6 +3209,10 @@ func TestReserveSnapshotThrottling(t *testing.T) {
t.Errorf("unexpected snapshot queue length; expected: %d, got: %d", 1,
s.Metrics().RangeSnapshotRecvQueueLength.Value())
}
if s.Metrics().RangeSnapshotRecvQueueSize.Value() != int64(10) {
t.Errorf("unexplected snapshot queue size; expected: %d, got: %d", 1,
s.Metrics().RangeSnapshotRecvQueueSize.Value())
}
if s.Metrics().RangeSnapshotRecvInProgress.Value() != int64(1) {
t.Errorf("unexpected snapshots in progress; expected: %d, got: %d", 1,
s.Metrics().RangeSnapshotRecvInProgress.Value())
Expand All @@ -3216,14 +3224,16 @@ func TestReserveSnapshotThrottling(t *testing.T) {
}()

cleanupNonEmpty3, err := s.reserveReceiveSnapshot(ctx, &kvserverpb.SnapshotRequest_Header{
RangeSize: 1,
RangeSize: 10,
})
if err != nil {
t.Fatal(err)
}
atomic.StoreInt32(&boom, 1)
require.Equal(t, int64(0), s.Metrics().RangeSnapshotRecvQueueLength.Value(),
"unexpected snapshot queue length")
require.Equal(t, int64(0), s.Metrics().RangeSnapshotRecvQueueSize.Value(),
"unexpected snapshot queue size")
require.Equal(t, int64(1), s.Metrics().RangeSnapshotRecvInProgress.Value(),
"unexpected snapshots in progress")
require.Equal(t, int64(1), s.Metrics().RangeSnapshotRecvTotalInProgress.Value(),
Expand Down
7 changes: 7 additions & 0 deletions pkg/ts/catalog/chart_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,13 @@ var charts = []sectionDescription{
"range.snapshots.delegate.sent-bytes",
},
},
{
Title: "Snapshot Queue Bytes",
Metrics: []string{
"range.snapshots.send-queue-bytes",
"range.snapshots.recv-queue-bytes",
},
},
},
},
{
Expand Down