Skip to content

Commit

Permalink
Merge #94165 #96458
Browse files Browse the repository at this point in the history
94165: kv: integrate raft async storage writes r=nvanbenschoten a=nvanbenschoten

Fixes #17500.
Epic: CRDB-22644

This commit integrates with the `AsyncStorageWrites` functionality that we added to Raft in etcd-io/raft/pull/8. 

## Approach

The commit makes the minimal changes needed to integrate with async storage writes and pull fsyncs out of the raft state machine loop. It does not make an effort to extract the non-durable portion of raft log writes or raft log application onto separate goroutine pools, as was described in #17500. Those changes will also be impactful, but they're non trivial and bump into a pipelining vs. batching trade-off, so they are left as future work items. See #94853 and #94854.

With this change, asynchronous Raft log syncs are enabled by the new `DB.ApplyNoSyncWait` Pebble API introduced in cockroachdb/pebble/pull/2117.  The `handleRaftReady` state machine loop continues to initiate Raft log writes, but it uses the Pebble API to offload waiting on durability to a separate goroutine. This separate goroutine then sends the corresponding `MsgStorageAppend`'s response messages where they need to go (locally and/or to the Raft leader) when the fsync completes. The async storage writes functionality in Raft makes this all safe.

## Benchmark Results

The result of this change is reduced interference between Raft proposals. As a result, it reduces end-to-end commit latency.

etcd-io/raft/pull/8 presented a collection of benchmark results captured from integrating async storage writes with rafttoy.

When integrated into CockroachDB, we see similar improvements to average and tail latency. However, it doesn't provide the throughput improvements at the top end because log appends and state machine application have not yet been extracted into separate goroutine pools, which would facilitate an increased opportunity for batching.

To visualize the impact on latency, consider the following test. The experiment uses a 3-node GCP cluster with n2-standard-32 instances spread across three availability zones. It runs kv0 (write-only) against the cluster with 64-byte values. It then ramps up concurrency to compare throughput vs. average and tail latency.

_NOTE: log scales on x and y axes_

![Throughput vs  average latency of write-only workload](https://user-images.githubusercontent.com/5438456/209210719-bec842f6-1093-48cd-8be7-05a3d79c2a71.svg)

![Throughput vs  tail latency of write-only workload](https://user-images.githubusercontent.com/5438456/209210777-670a4d25-9516-41a2-b7e7-86b402004536.svg)

Async storage writes impacts latency by different amounts at different throughputs, ranging from an improvement of 20% to 40% when the system is "well utilized". However, it increases latency by 5% to 10% when the system is over-saturated and CPU bound, presumably because of the extra goroutine handoff to the log append fsync callback, which will be impacted by elevated goroutine scheduling latency.

| Throughput (B/s) | Throughput (qps) | Avg. Latency Δ | p99 Latency Δ |
| ---------------- | ---------------- | -------------- | ------------- |
| 63  KB/s         | 1,000            | -10.5%         | -8.8%         |
| 125 KB/s         | 2,000            | -7.1%          | -10.4%        |
| 250 KB/s         | 4,000            | -20%           | -11.2%        |
| 500 KB/s         | 8,000            | -16.6%         | -25.3%        |
| 1 MB/s           | 16,000           | -30.8%         | -44.0%        |
| 2 MB/s           | 32,000           | -38.2%         | -30.9%        |
| 4 MB/s           | 64,000           | 5.9%           | 9.4%          |

### Other benchmark results
```bash
name                   old ops/s    new ops/s    delta
# 50% read, 50% update
ycsb/A/nodes=3          16.0k ± 5%   16.2k ± 4%     ~     (p=0.353 n=10+10)
ycsb/A/nodes=3/cpu=32   28.7k ± 5%   33.8k ± 2%  +17.57%  (p=0.000 n=9+9)
# 95% read, 5% update
ycsb/B/nodes=3          29.9k ± 3%   30.2k ± 3%     ~     (p=0.278 n=9+10)
ycsb/B/nodes=3/cpu=32    101k ± 1%    100k ± 3%     ~     (p=0.274 n=8+10)
# 100% read
ycsb/C/nodes=3          40.4k ± 3%   40.0k ± 3%     ~     (p=0.190 n=10+10)
ycsb/C/nodes=3/cpu=32    135k ± 1%    137k ± 1%   +0.87%  (p=0.011 n=9+9)
# 95% read, 5% insert
ycsb/D/nodes=3          33.6k ± 3%   33.8k ± 3%     ~     (p=0.315 n=10+10)
ycsb/D/nodes=3/cpu=32    108k ± 1%    106k ± 6%     ~     (p=0.739 n=10+10)
# 95% scan, 5% insert
ycsb/E/nodes=3          3.79k ± 1%   3.73k ± 1%   -1.42%  (p=0.000 n=9+9)
ycsb/E/nodes=3/cpu=32   6.31k ± 5%   6.48k ± 6%     ~     (p=0.123 n=10+10)
# 50% read, 50% read-modify-write
ycsb/F/nodes=3          7.68k ± 2%   7.99k ± 2%   +4.11%  (p=0.000 n=10+10)
ycsb/F/nodes=3/cpu=32   15.6k ± 4%   18.1k ± 3%  +16.14%  (p=0.000 n=8+10)

name                   old avg(ms)  new avg(ms)  delta
ycsb/A/nodes=3           6.01 ± 5%    5.95 ± 4%     ~     (p=0.460 n=10+10)
ycsb/A/nodes=3/cpu=32    5.01 ± 4%    4.25 ± 4%  -15.19%  (p=0.000 n=9+10)
ycsb/B/nodes=3           4.80 ± 0%    4.77 ± 4%     ~     (p=0.586 n=7+10)
ycsb/B/nodes=3/cpu=32    1.90 ± 0%    1.90 ± 0%     ~     (all equal)
ycsb/C/nodes=3           3.56 ± 2%    3.61 ± 3%     ~     (p=0.180 n=10+10)
ycsb/C/nodes=3/cpu=32    1.40 ± 0%    1.40 ± 0%     ~     (all equal)
ycsb/D/nodes=3           2.87 ± 2%    2.85 ± 2%     ~     (p=0.650 n=10+10)
ycsb/D/nodes=3/cpu=32    1.30 ± 0%    1.34 ± 4%     ~     (p=0.087 n=10+10)
ycsb/E/nodes=3           25.3 ± 0%    25.7 ± 1%   +1.38%  (p=0.000 n=8+8)
ycsb/E/nodes=3/cpu=32    22.9 ± 5%    22.2 ± 6%     ~     (p=0.109 n=10+10)
ycsb/F/nodes=3           12.5 ± 2%    12.0 ± 1%   -3.72%  (p=0.000 n=10+9)
ycsb/F/nodes=3/cpu=32    9.27 ± 4%    7.98 ± 3%  -13.96%  (p=0.000 n=8+10)

name                   old p99(ms)  new p99(ms)  delta
ycsb/A/nodes=3           45.7 ±15%    35.7 ± 6%  -21.90%  (p=0.000 n=10+8)
ycsb/A/nodes=3/cpu=32    67.6 ±13%    55.3 ± 5%  -18.10%  (p=0.000 n=9+10)
ycsb/B/nodes=3           30.5 ±24%    29.4 ± 7%     ~     (p=0.589 n=10+10)
ycsb/B/nodes=3/cpu=32    12.8 ± 2%    13.3 ± 7%     ~     (p=0.052 n=10+8)
ycsb/C/nodes=3           14.0 ± 3%    14.2 ± 0%     ~     (p=0.294 n=10+8)
ycsb/C/nodes=3/cpu=32    5.80 ± 0%    5.70 ± 5%     ~     (p=0.233 n=7+10)
ycsb/D/nodes=3           12.4 ± 2%    11.7 ± 3%   -5.32%  (p=0.001 n=10+10)
ycsb/D/nodes=3/cpu=32    6.30 ± 0%    5.96 ± 6%   -5.40%  (p=0.001 n=10+10)
ycsb/E/nodes=3           81.0 ± 4%    83.9 ± 0%   +3.63%  (p=0.012 n=10+7)
ycsb/E/nodes=3/cpu=32     139 ±19%     119 ±12%  -14.46%  (p=0.021 n=10+10)
ycsb/F/nodes=3            122 ±17%     103 ±10%  -15.48%  (p=0.002 n=10+8)
ycsb/F/nodes=3/cpu=32     146 ±20%     133 ± 7%   -8.89%  (p=0.025 n=10+10)
```

The way to interpret these results is that async raft storage writes reduce latency and, as a result of the closed loop natured workload, also increase throughput for the YCSB variants that perform writes and aren't already CPU saturated. Variants that are read-only are unaffected. Variants that are CPU-saturated do not benefit from the change because they are already bottlenecked on CPU resources and cannot push any more load (see above).

----

Release note (performance improvement): The Raft proposal pipeline has been optimized to reduce interference between Raft proposals. This improves average and tail write latency at high concurrency.

96458: sql: fixes statement contention count metric r=j82w a=j82w

Fixes a bug introduced in #94750 where the metric
count was counting transaction that hit contention events instead of the statement count.

closes: #96429

Release note: none

Co-authored-by: Nathan VanBenschoten <[email protected]>
Co-authored-by: j82w <[email protected]>
  • Loading branch information
3 people committed Feb 3, 2023
3 parents be08998 + ebc33fa + 7fb9f49 commit 1708ea4
Show file tree
Hide file tree
Showing 21 changed files with 776 additions and 444 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/apply/doc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ Setting up a batch of seven log entries:
}

fmt.Println("\nAckCommittedEntriesBeforeApplication:")
if err := t.AckCommittedEntriesBeforeApplication(ctx, 10 /* maxIndex */); err != nil {
if err := t.AckCommittedEntriesBeforeApplication(ctx); err != nil {
panic(err)
}
fmt.Print(`
Expand Down
13 changes: 2 additions & 11 deletions pkg/kv/kvserver/apply/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,13 +197,7 @@ func (t *Task) assertDecoded() {
// it is applied. Because of this, the client can be informed of the success of
// a write at this point, but we cannot release that write's latches until the
// write has applied. See ProposalData.signalProposalResult/finishApplication.
//
// 4. Note that when catching up a follower that is behind, the (etcd/raft)
// leader will emit an MsgApp with a commit index that encompasses the entries
// in the MsgApp, and Ready() will expose these as present in both the Entries
// and CommittedEntries slices (i.e. append and apply). We don't ack these
// early - the caller will pass the "old" last index in.
func (t *Task) AckCommittedEntriesBeforeApplication(ctx context.Context, maxIndex uint64) error {
func (t *Task) AckCommittedEntriesBeforeApplication(ctx context.Context) error {
t.assertDecoded()
if !t.anyLocal {
return nil // fast-path
Expand All @@ -218,11 +212,8 @@ func (t *Task) AckCommittedEntriesBeforeApplication(ctx context.Context, maxInde
defer iter.Close()

// Collect a batch of trivial commands from the applier. Stop at the first
// non-trivial command or at the first command with an index above maxIndex.
// non-trivial command.
batchIter := takeWhileCmdIter(iter, func(cmd Command) bool {
if cmd.Index() > maxIndex {
return false
}
return cmd.IsTrivial()
})

Expand Down
36 changes: 1 addition & 35 deletions pkg/kv/kvserver/apply/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func TestAckCommittedEntriesBeforeApplication(t *testing.T) {
appT := apply.MakeTask(sm, dec)
defer appT.Close()
require.NoError(t, appT.Decode(ctx, ents))
require.NoError(t, appT.AckCommittedEntriesBeforeApplication(ctx, 10 /* maxIndex */))
require.NoError(t, appT.AckCommittedEntriesBeforeApplication(ctx))

// Assert that the state machine was not updated.
require.Equal(t, testStateMachine{}, *sm)
Expand All @@ -338,40 +338,6 @@ func TestAckCommittedEntriesBeforeApplication(t *testing.T) {
require.Equal(t, exp, cmd.acked)
require.False(t, cmd.finished)
}

// Try again with a lower maximum log index.
appT.Close()
ents = makeEntries(5)

dec = newTestDecoder()
dec.nonLocal[2] = true
dec.shouldReject[3] = true

appT = apply.MakeTask(sm, dec)
require.NoError(t, appT.Decode(ctx, ents))
require.NoError(t, appT.AckCommittedEntriesBeforeApplication(ctx, 4 /* maxIndex */))

// Assert that the state machine was not updated.
require.Equal(t, testStateMachine{}, *sm)

// Assert that some commands were acknowledged early and that none were finished.
for _, cmd := range dec.cmds {
var exp bool
switch cmd.index {
case 1, 4:
exp = true // local and successful
case 2:
exp = false // remote
case 3:
exp = false // local and rejected
case 5:
exp = false // index too high
default:
t.Fatalf("unexpected index %d", cmd.index)
}
require.Equal(t, exp, cmd.acked)
require.False(t, cmd.finished)
}
}

func TestApplyCommittedEntriesWithErr(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func (r *Replica) GetRaftLogSize() (int64, bool) {
func (r *Replica) GetCachedLastTerm() uint64 {
r.mu.RLock()
defer r.mu.RUnlock()
return r.mu.lastTerm
return r.mu.lastTermNotDurable
}

func (r *Replica) IsRaftGroupInitialized() bool {
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvserver/logstore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_library(
"sideload.go",
"sideload_disk.go",
"stateloader.go",
"sync_waiter.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore",
visibility = ["//visibility:public"],
Expand All @@ -22,15 +23,18 @@ go_library(
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/storage/fs",
"//pkg/util/buildutil",
"//pkg/util/envutil",
"//pkg/util/hlc",
"//pkg/util/iterutil",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/protoutil",
"//pkg/util/stop",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_errors//oserror",
"@com_github_cockroachdb_pebble//record",
"@com_github_cockroachdb_redact//:redact",
"@io_etcd_go_raft_v3//:raft",
"@io_etcd_go_raft_v3//raftpb",
Expand All @@ -43,6 +47,7 @@ go_test(
srcs = [
"logstore_bench_test.go",
"sideload_test.go",
"sync_waiter_test.go",
],
args = ["-test.timeout=295s"],
embed = [":logstore"],
Expand All @@ -60,6 +65,7 @@ go_test(
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/protoutil",
"//pkg/util/stop",
"//pkg/util/tracing",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_errors//oserror",
Expand Down
Loading

0 comments on commit 1708ea4

Please sign in to comment.