From 193d93f4232763dea6e2b07b6b49703e427fa3b0 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Wed, 5 Oct 2022 15:56:38 +0530 Subject: [PATCH 1/2] feat: call the logic for waiting for shard locks on synchronous on-term hooks and add tests for it Signed-off-by: Manan Gupta --- go/vt/vtorc/logic/orchestrator.go | 81 ++++++++++++++------------ go/vt/vtorc/logic/orchestrator_test.go | 56 ++++++++++++++++++ 2 files changed, 99 insertions(+), 38 deletions(-) create mode 100644 go/vt/vtorc/logic/orchestrator_test.go diff --git a/go/vt/vtorc/logic/orchestrator.go b/go/vt/vtorc/logic/orchestrator.go index 827b40ebd96..e83bdc0e981 100644 --- a/go/vt/vtorc/logic/orchestrator.go +++ b/go/vt/vtorc/logic/orchestrator.go @@ -24,12 +24,12 @@ import ( "syscall" "time" - "vitess.io/vitess/go/vt/log" - "github.com/patrickmn/go-cache" "github.com/rcrowley/go-metrics" "github.com/sjmudd/stopwatch" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/vtorc/collection" "vitess.io/vitess/go/vt/vtorc/config" "vitess.io/vitess/go/vt/vtorc/discovery" @@ -105,49 +105,52 @@ func instancePollSecondsDuration() time.Duration { return time.Duration(config.Config.InstancePollSeconds) * time.Second } -// acceptSignals registers for OS signals -func acceptSignals() { +// acceptSighupSignal registers for OS signals +func acceptSighupSignal() { c := make(chan os.Signal, 1) signal.Notify(c, syscall.SIGHUP) - signal.Notify(c, syscall.SIGTERM) go func() { - for sig := range c { - switch sig { - case syscall.SIGHUP: - log.Infof("Received SIGHUP. Reloading configuration") - _ = inst.AuditOperation("reload-configuration", nil, "Triggered via SIGHUP") - config.Reload() - discoveryMetrics.SetExpirePeriod(time.Duration(config.DiscoveryCollectionRetentionSeconds) * time.Second) - case syscall.SIGTERM: - log.Infof("Received SIGTERM. Starting shutdown") - atomic.StoreInt32(&hasReceivedSIGTERM, 1) - discoveryMetrics.StopAutoExpiration() - // probably should poke other go routines to stop cleanly here ... - _ = inst.AuditOperation("shutdown", nil, "Triggered via SIGTERM") - timeout := time.After(shutdownWaitTime) - func() { - for { - count := atomic.LoadInt32(&shardsLockCounter) - if count == 0 { - return - } - select { - case <-timeout: - log.Infof("wait for lock release timed out. Some locks might not have been released.") - return - default: - time.Sleep(100 * time.Millisecond) - } - } - }() - log.Infof("Shutting down vtorc") - os.Exit(0) - } + for range c { + log.Infof("Received SIGHUP. Reloading configuration") + _ = inst.AuditOperation("reload-configuration", nil, "Triggered via SIGHUP") + config.Reload() + discoveryMetrics.SetExpirePeriod(time.Duration(config.DiscoveryCollectionRetentionSeconds) * time.Second) } }() } +// closeVTOrc runs all the operations required to cleanly shutdown VTOrc +func closeVTOrc() { + log.Infof("Starting VTOrc shutdown") + atomic.StoreInt32(&hasReceivedSIGTERM, 1) + discoveryMetrics.StopAutoExpiration() + // Poke other go routines to stop cleanly here ... + _ = inst.AuditOperation("shutdown", nil, "Triggered via SIGTERM") + // wait for the locks to be released + waitForLocksRelease() + log.Infof("VTOrc closed") +} + +// waitForLocksRelease is used to wait for release of locks +func waitForLocksRelease() { + timeout := time.After(shutdownWaitTime) + for { + count := atomic.LoadInt32(&shardsLockCounter) + if count == 0 { + break + } + select { + case <-timeout: + log.Infof("wait for lock release timed out. Some locks might not have been released.") + default: + time.Sleep(50 * time.Millisecond) + continue + } + break + } +} + // handleDiscoveryRequests iterates the discoveryQueue channel and calls upon // instance discovery per entry. func handleDiscoveryRequests() { @@ -358,7 +361,9 @@ func ContinuousDiscovery() { go func() { _ = ometrics.InitMetrics() }() - go acceptSignals() + go acceptSighupSignal() + // On termination of the server, we should close VTOrc cleanly + servenv.OnTermSync(closeVTOrc) log.Infof("continuous discovery: starting") for { diff --git a/go/vt/vtorc/logic/orchestrator_test.go b/go/vt/vtorc/logic/orchestrator_test.go new file mode 100644 index 00000000000..c8f2ac3bfdc --- /dev/null +++ b/go/vt/vtorc/logic/orchestrator_test.go @@ -0,0 +1,56 @@ +package logic + +import ( + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestWaitForLocksRelease(t *testing.T) { + oldShutdownWaitTime := shutdownWaitTime + // Restore initial values + defer func() { + shutdownWaitTime = oldShutdownWaitTime + }() + + t.Run("No locks to wait for", func(t *testing.T) { + // Initially when shardsLockCounter is zero, waitForLocksRelease should run immediately + timeSpent := waitForLocksReleaseAndGetTimeWaitedFor() + assert.Less(t, timeSpent, 1*time.Second, "waitForLocksRelease should run immediately if there are no locks to wait for") + }) + + t.Run("Timeout from shutdownWaitTime", func(t *testing.T) { + // Increment shardsLockCounter to simulate locking of a shard + atomic.AddInt32(&shardsLockCounter, +1) + defer func() { + // Restore the initial value + atomic.StoreInt32(&shardsLockCounter, 0) + }() + shutdownWaitTime = 200 * time.Millisecond + timeSpent := waitForLocksReleaseAndGetTimeWaitedFor() + assert.Greater(t, timeSpent, 100*time.Millisecond, "waitForLocksRelease should timeout after 200 milliseconds and not before") + assert.Less(t, timeSpent, 300*time.Millisecond, "waitForLocksRelease should timeout after 200 milliseconds and not take any longer") + }) + + t.Run("Successful wait for locks release", func(t *testing.T) { + // Increment shardsLockCounter to simulate locking of a shard + atomic.AddInt32(&shardsLockCounter, +1) + shutdownWaitTime = 500 * time.Millisecond + // Release the locks after 200 milliseconds + go func() { + time.Sleep(200 * time.Millisecond) + atomic.StoreInt32(&shardsLockCounter, 0) + }() + timeSpent := waitForLocksReleaseAndGetTimeWaitedFor() + assert.Greater(t, timeSpent, 100*time.Millisecond, "waitForLocksRelease should wait for the locks and not return early") + assert.Less(t, timeSpent, 300*time.Millisecond, "waitForLocksRelease should be successful after 200 milliseconds as all the locks are released") + }) +} + +func waitForLocksReleaseAndGetTimeWaitedFor() time.Duration { + start := time.Now() + waitForLocksRelease() + return time.Since(start) +} From 7edb23047820d9e33b07b7817229a3fe3e91ae70 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Thu, 6 Oct 2022 10:41:27 +0530 Subject: [PATCH 2/2] feat: fix comment for acceptSighupSignal function Signed-off-by: Manan Gupta --- go/vt/vtorc/logic/orchestrator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vtorc/logic/orchestrator.go b/go/vt/vtorc/logic/orchestrator.go index e83bdc0e981..ffc5b789aeb 100644 --- a/go/vt/vtorc/logic/orchestrator.go +++ b/go/vt/vtorc/logic/orchestrator.go @@ -105,7 +105,7 @@ func instancePollSecondsDuration() time.Duration { return time.Duration(config.Config.InstancePollSeconds) * time.Second } -// acceptSighupSignal registers for OS signals +// acceptSighupSignal registers for SIGHUP signal from the OS to reload the configuration files. func acceptSighupSignal() { c := make(chan os.Signal, 1)