From 3a2d5c8fcae5921afbb764b66faec47083a0b627 Mon Sep 17 00:00:00 2001 From: Shailendra Patel Date: Wed, 28 Aug 2024 11:08:38 +0530 Subject: [PATCH] roachtest: add operation for pausing ldr job. On DRT clusters we run chaos operation. Adding support for a new operation, this operation will pause a LDR job running for sometime. This will help uncover issue like replication lag and time ldr takes to recover from it. Epic: none Release note: None --- pkg/cmd/roachtest/operations/BUILD.bazel | 1 + pkg/cmd/roachtest/operations/pause_job.go | 88 +++++++++++++++++++ pkg/cmd/roachtest/operations/register.go | 1 + pkg/cmd/roachtest/registry/operation_spec.go | 1 + .../roachtestutil/operations/dependency.go | 14 +++ 5 files changed, 105 insertions(+) create mode 100644 pkg/cmd/roachtest/operations/pause_job.go diff --git a/pkg/cmd/roachtest/operations/BUILD.bazel b/pkg/cmd/roachtest/operations/BUILD.bazel index 39fc2129f3f4..280287b7d127 100644 --- a/pkg/cmd/roachtest/operations/BUILD.bazel +++ b/pkg/cmd/roachtest/operations/BUILD.bazel @@ -11,6 +11,7 @@ go_library( "manual_compaction.go", "network_partition.go", "node_kill.go", + "pause_job.go", "register.go", "resize.go", "utils.go", diff --git a/pkg/cmd/roachtest/operations/pause_job.go b/pkg/cmd/roachtest/operations/pause_job.go new file mode 100644 index 000000000000..699119367863 --- /dev/null +++ b/pkg/cmd/roachtest/operations/pause_job.go @@ -0,0 +1,88 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package operations + +import ( + "context" + "fmt" + "time" + + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/operation" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestflags" + "github.com/cockroachdb/cockroach/pkg/util/randutil" +) + +type resumePausedJob struct { + jobId string +} + +func (r *resumePausedJob) Cleanup(ctx context.Context, o operation.Operation, c cluster.Cluster) { + conn := c.Conn(ctx, o.L(), 1, option.VirtualClusterName(roachtestflags.VirtualCluster)) + defer conn.Close() + + resumeJobStmt := fmt.Sprintf("RESUME JOB %s", r.jobId) + _, err := conn.ExecContext(ctx, resumeJobStmt) + if err != nil { + o.Fatal(err) + } +} + +func pauseLDRJob( + ctx context.Context, o operation.Operation, c cluster.Cluster, +) registry.OperationCleanup { + conn := c.Conn(ctx, o.L(), 1, option.VirtualClusterName(roachtestflags.VirtualCluster)) + defer conn.Close() + + //fetch running ldr jobs + jobs, err := conn.QueryContext(ctx, "(WITH x AS (SHOW JOBS) SELECT job_id FROM x WHERE job_type = 'LOGICAL REPLICATION' AND status = 'running')") + if err != nil { + o.Fatal(err) + } + + var jobIds []string + for jobs.Next() { + var jobId string + if err := jobs.Scan(&jobId); err != nil { + o.Fatal(err) + } + jobIds = append(jobIds, jobId) + } + + //pick a random ldr job + rng, _ := randutil.NewPseudoRand() + jobId := jobIds[rng.Intn(len(jobIds))] + + o.Status(fmt.Sprintf("pausing LDR job %s", jobId)) + pauseJobStmt := fmt.Sprintf("PAUSE JOB %s WITH REASON = 'roachtest operation'", jobId) + _, err = conn.ExecContext(ctx, pauseJobStmt) + if err != nil { + o.Fatal(err) + } + + o.Status(fmt.Sprintf("paused LDR job %s", jobId)) + return &resumePausedJob{ + jobId: jobId, + } +} + +func registerPauseLDRJob(r registry.Registry) { + r.AddOperation(registry.OperationSpec{ + Name: "pause-ldr", + Owner: registry.OwnerDisasterRecovery, + Timeout: 15 * time.Minute, + CompatibleClouds: registry.AllClouds, + Dependencies: []registry.OperationDependency{registry.OperationRequiresLDRJobRunning}, + Run: pauseLDRJob, + }) +} diff --git a/pkg/cmd/roachtest/operations/register.go b/pkg/cmd/roachtest/operations/register.go index 7ba9d91b9e64..712f1664028c 100644 --- a/pkg/cmd/roachtest/operations/register.go +++ b/pkg/cmd/roachtest/operations/register.go @@ -23,4 +23,5 @@ func RegisterOperations(r registry.Registry) { registerBackupRestore(r) registerManualCompaction(r) registerResize(r) + registerPauseLDRJob(r) } diff --git a/pkg/cmd/roachtest/registry/operation_spec.go b/pkg/cmd/roachtest/registry/operation_spec.go index d83da9a2bf03..dc37b6d710c3 100644 --- a/pkg/cmd/roachtest/registry/operation_spec.go +++ b/pkg/cmd/roachtest/registry/operation_spec.go @@ -30,6 +30,7 @@ const ( OperationRequiresPopulatedDatabase OperationRequiresZeroUnavailableRanges OperationRequiresZeroUnderreplicatedRanges + OperationRequiresLDRJobRunning ) // OperationCleanup specifies an operation that diff --git a/pkg/cmd/roachtest/roachtestutil/operations/dependency.go b/pkg/cmd/roachtest/roachtestutil/operations/dependency.go index 07a273b0a4d8..b503ab77218d 100644 --- a/pkg/cmd/roachtest/roachtestutil/operations/dependency.go +++ b/pkg/cmd/roachtest/roachtestutil/operations/dependency.go @@ -80,6 +80,20 @@ func CheckDependencies( if count != 0 { return false, nil } + case registry.OperationRequiresLDRJobRunning: + conn := c.Conn(ctx, l, 1, option.VirtualClusterName("system")) + defer conn.Close() + + jobsCur, err := conn.QueryContext(ctx, "(WITH x AS (SHOW JOBS) SELECT job_id FROM x WHERE job_type = 'LOGICAL REPLICATION' AND status = 'running' limit 1)") + if err != nil { + return false, err + } + jobsCur.Next() + var jobId string + _ = jobsCur.Scan(&jobId) + if jobId == "" { + return false, nil + } default: panic(fmt.Sprintf("unknown operation dependency %d", dep)) }