Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 43 additions & 38 deletions go/vt/vtorc/logic/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ import (
"syscall"
"time"

"vitess.io/vitess/go/vt/log"

"github.com/patrickmn/go-cache"
"github.com/rcrowley/go-metrics"
"github.com/sjmudd/stopwatch"

"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/vtorc/collection"
"vitess.io/vitess/go/vt/vtorc/config"
"vitess.io/vitess/go/vt/vtorc/discovery"
Expand Down Expand Up @@ -105,49 +105,52 @@ func instancePollSecondsDuration() time.Duration {
return time.Duration(config.Config.InstancePollSeconds) * time.Second
}

// acceptSignals registers for OS signals
func acceptSignals() {
// acceptSighupSignal registers for SIGHUP signal from the OS to reload the configuration files.
func acceptSighupSignal() {
c := make(chan os.Signal, 1)

signal.Notify(c, syscall.SIGHUP)
signal.Notify(c, syscall.SIGTERM)
go func() {
for sig := range c {
switch sig {
case syscall.SIGHUP:
log.Infof("Received SIGHUP. Reloading configuration")
_ = inst.AuditOperation("reload-configuration", nil, "Triggered via SIGHUP")
config.Reload()
discoveryMetrics.SetExpirePeriod(time.Duration(config.DiscoveryCollectionRetentionSeconds) * time.Second)
case syscall.SIGTERM:
log.Infof("Received SIGTERM. Starting shutdown")
atomic.StoreInt32(&hasReceivedSIGTERM, 1)
discoveryMetrics.StopAutoExpiration()
// probably should poke other go routines to stop cleanly here ...
_ = inst.AuditOperation("shutdown", nil, "Triggered via SIGTERM")
timeout := time.After(shutdownWaitTime)
func() {
for {
count := atomic.LoadInt32(&shardsLockCounter)
if count == 0 {
return
}
select {
case <-timeout:
log.Infof("wait for lock release timed out. Some locks might not have been released.")
return
default:
time.Sleep(100 * time.Millisecond)
}
}
}()
log.Infof("Shutting down vtorc")
os.Exit(0)
}
for range c {
log.Infof("Received SIGHUP. Reloading configuration")
_ = inst.AuditOperation("reload-configuration", nil, "Triggered via SIGHUP")
config.Reload()
discoveryMetrics.SetExpirePeriod(time.Duration(config.DiscoveryCollectionRetentionSeconds) * time.Second)
}
}()
}

// closeVTOrc runs all the operations required to cleanly shutdown VTOrc
func closeVTOrc() {
log.Infof("Starting VTOrc shutdown")
atomic.StoreInt32(&hasReceivedSIGTERM, 1)
discoveryMetrics.StopAutoExpiration()
// Poke other go routines to stop cleanly here ...
_ = inst.AuditOperation("shutdown", nil, "Triggered via SIGTERM")
// wait for the locks to be released
waitForLocksRelease()
log.Infof("VTOrc closed")
}

// waitForLocksRelease is used to wait for release of locks
func waitForLocksRelease() {
timeout := time.After(shutdownWaitTime)
for {
count := atomic.LoadInt32(&shardsLockCounter)
if count == 0 {
break
}
select {
case <-timeout:
log.Infof("wait for lock release timed out. Some locks might not have been released.")
default:
time.Sleep(50 * time.Millisecond)
continue
}
break
}
}

// handleDiscoveryRequests iterates the discoveryQueue channel and calls upon
// instance discovery per entry.
func handleDiscoveryRequests() {
Expand Down Expand Up @@ -358,7 +361,9 @@ func ContinuousDiscovery() {
go func() {
_ = ometrics.InitMetrics()
}()
go acceptSignals()
go acceptSighupSignal()
// On termination of the server, we should close VTOrc cleanly
servenv.OnTermSync(closeVTOrc)

log.Infof("continuous discovery: starting")
for {
Expand Down
56 changes: 56 additions & 0 deletions go/vt/vtorc/logic/orchestrator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package logic

import (
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestWaitForLocksRelease(t *testing.T) {
oldShutdownWaitTime := shutdownWaitTime
// Restore initial values
defer func() {
shutdownWaitTime = oldShutdownWaitTime
}()

t.Run("No locks to wait for", func(t *testing.T) {
// Initially when shardsLockCounter is zero, waitForLocksRelease should run immediately
timeSpent := waitForLocksReleaseAndGetTimeWaitedFor()
assert.Less(t, timeSpent, 1*time.Second, "waitForLocksRelease should run immediately if there are no locks to wait for")
})

t.Run("Timeout from shutdownWaitTime", func(t *testing.T) {
// Increment shardsLockCounter to simulate locking of a shard
atomic.AddInt32(&shardsLockCounter, +1)
defer func() {
// Restore the initial value
atomic.StoreInt32(&shardsLockCounter, 0)
}()
shutdownWaitTime = 200 * time.Millisecond
timeSpent := waitForLocksReleaseAndGetTimeWaitedFor()
assert.Greater(t, timeSpent, 100*time.Millisecond, "waitForLocksRelease should timeout after 200 milliseconds and not before")
assert.Less(t, timeSpent, 300*time.Millisecond, "waitForLocksRelease should timeout after 200 milliseconds and not take any longer")
})

t.Run("Successful wait for locks release", func(t *testing.T) {
// Increment shardsLockCounter to simulate locking of a shard
atomic.AddInt32(&shardsLockCounter, +1)
shutdownWaitTime = 500 * time.Millisecond
// Release the locks after 200 milliseconds
go func() {
time.Sleep(200 * time.Millisecond)
atomic.StoreInt32(&shardsLockCounter, 0)
}()
timeSpent := waitForLocksReleaseAndGetTimeWaitedFor()
assert.Greater(t, timeSpent, 100*time.Millisecond, "waitForLocksRelease should wait for the locks and not return early")
assert.Less(t, timeSpent, 300*time.Millisecond, "waitForLocksRelease should be successful after 200 milliseconds as all the locks are released")
})
}

func waitForLocksReleaseAndGetTimeWaitedFor() time.Duration {
start := time.Now()
waitForLocksRelease()
return time.Since(start)
}