Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
90765: roachtest: add kv0 c2c roachtest r=stevendanna a=stevendanna

This adds a simple roachtest that runs kv0 during tenant replication. We plan to expand this test over time, but this one can serve as a basic example of how to get everything wired together.

Epic: CRDB-14682

Release note: None

91603: execbuilder: skip flaky sql_activity_stats_compaction tests r=ericharmeling a=ericharmeling

This commit skips flaky sql_activity_stats_compaction execbuilder tests while we investigate the occasional test failures.

Part of #91600.

Release note: none

91608: sql: Cleanup constant name and comments r=miretskiy a=miretskiy

Cleanup json parser comments and constants to use
correct naming.

Epic: None

Release note: None

91660: kvserver: fix consistency check test flake r=erikgrinaker a=pavelkalinnikov

Since we no longer have GC for computation trackers, sometimes getChecksum would return with a timeout instead of early return, because it would not find the tracker after a failed computation task cleaned it up.

This commit fixes the test flake. Instead of trying to force one or the other behaviour, the test accepts both as valid.

Epic: None
Release note: None

Co-authored-by: Steven Danna <[email protected]>
Co-authored-by: Eric Harmeling <[email protected]>
Co-authored-by: Yevgeniy Miretskiy <[email protected]>
Co-authored-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
5 people committed Nov 10, 2022
5 parents 53e75b0 + 07ffd67 + ebb37e9 + 9be3877 + 70a5d0e commit cf2b7f8
Show file tree
Hide file tree
Showing 10 changed files with 522 additions and 175 deletions.
4 changes: 4 additions & 0 deletions pkg/cmd/roachtest/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ go_library(
"clock_monotonic.go",
"clock_util.go",
"cluster_init.go",
"cluster_to_cluster.go",
"connection_latency.go",
"copy.go",
"copyfrom.go",
Expand Down Expand Up @@ -74,6 +75,7 @@ go_library(
"knex.go",
"kv.go",
"kvbench.go",
"latency_verifier.go",
"ledger.go",
"libpq.go",
"libpq_blocklist.go",
Expand Down Expand Up @@ -192,6 +194,7 @@ go_library(
"//pkg/roachprod/install",
"//pkg/roachprod/logger",
"//pkg/roachprod/prometheus",
"//pkg/roachprod/vm/local",
"//pkg/security/username",
"//pkg/server",
"//pkg/server/serverpb",
Expand All @@ -209,6 +212,7 @@ go_library(
"//pkg/util/cancelchecker",
"//pkg/util/contextutil",
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
"//pkg/util/httputil",
"//pkg/util/humanizeutil",
"//pkg/util/log",
Expand Down
159 changes: 13 additions & 146 deletions pkg/cmd/roachtest/tests/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/codahale/hdrhistogram"
)

type workloadType string
Expand Down Expand Up @@ -199,9 +196,11 @@ func cdcBasicTest(ctx context.Context, t test.Test, c cluster.Cluster, args cdcT
}
defer changefeedLogger.Close()
verifier := makeLatencyVerifier(
"changefeed",
args.targetInitialScanLatency,
args.targetSteadyLatency,
changefeedLogger,
func(db *gosql.DB, jobID int) (jobInfo, error) { return getChangefeedInfo(db, jobID) },
t.Status,
args.crdbChaos,
)
Expand Down Expand Up @@ -1630,144 +1629,6 @@ func (lw *ledgerWorkload) run(ctx context.Context, c cluster.Cluster, workloadDu
))
}

type latencyVerifier struct {
statementTime time.Time
targetSteadyLatency time.Duration
targetInitialScanLatency time.Duration
tolerateErrors bool
logger *logger.Logger
setTestStatus func(...interface{})

initialScanLatency time.Duration
maxSeenSteadyLatency time.Duration
maxSeenSteadyEveryN log.EveryN
latencyBecameSteady bool

latencyHist *hdrhistogram.Histogram
}

func makeLatencyVerifier(
targetInitialScanLatency time.Duration,
targetSteadyLatency time.Duration,
l *logger.Logger,
setTestStatus func(...interface{}),
tolerateErrors bool,
) *latencyVerifier {
const sigFigs, minLatency, maxLatency = 1, 100 * time.Microsecond, 100 * time.Second
hist := hdrhistogram.New(minLatency.Nanoseconds(), maxLatency.Nanoseconds(), sigFigs)
return &latencyVerifier{
targetInitialScanLatency: targetInitialScanLatency,
targetSteadyLatency: targetSteadyLatency,
logger: l,
setTestStatus: setTestStatus,
latencyHist: hist,
tolerateErrors: tolerateErrors,
maxSeenSteadyEveryN: log.Every(10 * time.Second),
}
}

func (lv *latencyVerifier) noteHighwater(highwaterTime time.Time) {
if highwaterTime.Before(lv.statementTime) {
return
}
if lv.initialScanLatency == 0 {
lv.initialScanLatency = timeutil.Since(lv.statementTime)
lv.logger.Printf("initial scan completed: latency %s\n", lv.initialScanLatency)
return
}

latency := timeutil.Since(highwaterTime)
if latency < lv.targetSteadyLatency/2 {
lv.latencyBecameSteady = true
}
if !lv.latencyBecameSteady {
// Before we have RangeFeed, the polls just get
// progressively smaller after the initial one. Start
// tracking the max latency once we seen a latency
// that's less than the max allowed. Verify at the end
// of the test that this happens at some point.
if lv.maxSeenSteadyEveryN.ShouldLog() {
lv.setTestStatus(fmt.Sprintf(
"watching changefeed: end-to-end latency %s not yet below target steady latency %s",
latency.Truncate(time.Millisecond), lv.targetSteadyLatency.Truncate(time.Millisecond)))
}
return
}
if err := lv.latencyHist.RecordValue(latency.Nanoseconds()); err != nil {
lv.logger.Printf("could not record value %s: %s\n", latency, err)
}
if latency > lv.maxSeenSteadyLatency {
lv.maxSeenSteadyLatency = latency
}
if lv.maxSeenSteadyEveryN.ShouldLog() {
lv.setTestStatus(fmt.Sprintf(
"watching changefeed: end-to-end steady latency %s; max steady latency so far %s",
latency.Truncate(time.Millisecond), lv.maxSeenSteadyLatency.Truncate(time.Millisecond)))
}
}

func (lv *latencyVerifier) pollLatency(
ctx context.Context, db *gosql.DB, jobID int, interval time.Duration, stopper chan struct{},
) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-stopper:
return nil
case <-time.After(time.Second):
}

info, err := getChangefeedInfo(db, jobID)
if err != nil {
if lv.tolerateErrors {
lv.logger.Printf("error getting changefeed info: %s", err)
continue
}
return err
}
if info.status != `running` {
lv.logger.Printf("unexpected status: %s, error: %s", info.status, info.errMsg)
return errors.Errorf(`unexpected status: %s`, info.status)
}
lv.noteHighwater(info.highwaterTime)
}
}

func (lv *latencyVerifier) assertValid(t test.Test) {
if lv.initialScanLatency == 0 {
t.Fatalf("initial scan did not complete")
}
if lv.initialScanLatency > lv.targetInitialScanLatency {
t.Fatalf("initial scan latency was more than target: %s vs %s",
lv.initialScanLatency, lv.targetInitialScanLatency)
}
if !lv.latencyBecameSteady {
t.Fatalf("latency never dropped to acceptable steady level: %s", lv.targetSteadyLatency)
}
if lv.maxSeenSteadyLatency > lv.targetSteadyLatency {
t.Fatalf("max latency was more than allowed: %s vs %s",
lv.maxSeenSteadyLatency, lv.targetSteadyLatency)
}
}

func (lv *latencyVerifier) maybeLogLatencyHist() {
if lv.latencyHist == nil {
return
}
lv.logger.Printf(
"changefeed end-to-end __avg(ms)__p50(ms)__p75(ms)__p90(ms)__p95(ms)__p99(ms)_pMax(ms)\n")
lv.logger.Printf("changefeed end-to-end %8.1f %8.1f %8.1f %8.1f %8.1f %8.1f %8.1f\n",
time.Duration(lv.latencyHist.Mean()).Seconds()*1000,
time.Duration(lv.latencyHist.ValueAtQuantile(50)).Seconds()*1000,
time.Duration(lv.latencyHist.ValueAtQuantile(75)).Seconds()*1000,
time.Duration(lv.latencyHist.ValueAtQuantile(90)).Seconds()*1000,
time.Duration(lv.latencyHist.ValueAtQuantile(95)).Seconds()*1000,
time.Duration(lv.latencyHist.ValueAtQuantile(99)).Seconds()*1000,
time.Duration(lv.latencyHist.ValueAtQuantile(100)).Seconds()*1000,
)
}

type cdcOption struct {
option string
value string
Expand Down Expand Up @@ -1848,29 +1709,35 @@ type changefeedInfo struct {
highwaterTime time.Time
}

func getChangefeedInfo(db *gosql.DB, jobID int) (changefeedInfo, error) {
func (c *changefeedInfo) GetHighWater() time.Time { return c.highwaterTime }
func (c *changefeedInfo) GetStatus() string { return c.status }
func (c *changefeedInfo) GetError() string { return c.status }

var _ jobInfo = (*changefeedInfo)(nil)

func getChangefeedInfo(db *gosql.DB, jobID int) (*changefeedInfo, error) {
var status string
var payloadBytes []byte
var progressBytes []byte
if err := db.QueryRow(
`SELECT status, payload, progress FROM system.jobs WHERE id = $1`, jobID,
).Scan(&status, &payloadBytes, &progressBytes); err != nil {
return changefeedInfo{}, err
return nil, err
}
var payload jobspb.Payload
if err := protoutil.Unmarshal(payloadBytes, &payload); err != nil {
return changefeedInfo{}, err
return nil, err
}
var progress jobspb.Progress
if err := protoutil.Unmarshal(progressBytes, &progress); err != nil {
return changefeedInfo{}, err
return nil, err
}
var highwaterTime time.Time
highwater := progress.GetHighWater()
if highwater != nil {
highwaterTime = highwater.GoTime()
}
return changefeedInfo{
return &changefeedInfo{
status: status,
errMsg: payload.Error,
statementTime: payload.GetChangefeed().StatementTime.GoTime(),
Expand Down
Loading

0 comments on commit cf2b7f8

Please sign in to comment.