diff --git a/caddy/caddy.go b/caddy/caddy.go index 4434a6e3c..7bafed52e 100644 --- a/caddy/caddy.go +++ b/caddy/caddy.go @@ -113,7 +113,7 @@ func (f *FrankenPHPApp) Stop() error { // note: Exiting() is currently marked as 'experimental' // https://github.com/caddyserver/caddy/blob/e76405d55058b0a3e5ba222b44b5ef00516116aa/caddy.go#L810 if caddy.Exiting() { - frankenphp.Shutdown() + frankenphp.DrainWorkers() } // reset configuration so it doesn't bleed into later tests diff --git a/frankenphp.go b/frankenphp.go index f7c43277c..1706c086c 100644 --- a/frankenphp.go +++ b/frankenphp.go @@ -372,7 +372,7 @@ func Shutdown() { return } - drainWorkers() + drainWatcher() drainAutoScaling() drainPHPThreads() diff --git a/metrics.go b/metrics.go index ba9bfb942..2d87199a6 100644 --- a/metrics.go +++ b/metrics.go @@ -380,9 +380,9 @@ func (m *PrometheusMetrics) Shutdown() { } if err := m.registry.Register(m.queueDepth); err != nil && - !errors.As(err, &prometheus.AlreadyRegisteredError{}) { - panic(err) - } + !errors.As(err, &prometheus.AlreadyRegisteredError{}) { + panic(err) + } } func getWorkerNameForMetrics(name string) string { @@ -432,9 +432,9 @@ func NewPrometheusMetrics(registry prometheus.Registerer) *PrometheusMetrics { } if err := m.registry.Register(m.queueDepth); err != nil && - !errors.As(err, &prometheus.AlreadyRegisteredError{}) { - panic(err) - } + !errors.As(err, &prometheus.AlreadyRegisteredError{}) { + panic(err) + } return m } diff --git a/worker.go b/worker.go index b366e37ea..9291846db 100644 --- a/worker.go +++ b/worker.go @@ -86,18 +86,14 @@ func newWorker(o workerOpt) (*worker, error) { return w, nil } -func drainWorkers() { - watcher.DrainWatcher() +// EXPERIMENTAL: DrainWorkers finishes all worker scripts before a graceful shutdown +func DrainWorkers() { + _ = drainWorkerThreads() } -// RestartWorkers attempts to restart all workers gracefully -func RestartWorkers() { - // disallow scaling threads while restarting workers - scalingMu.Lock() - defer scalingMu.Unlock() - +func drainWorkerThreads() []*phpThread { ready := sync.WaitGroup{} - threadsToRestart := make([]*phpThread, 0) + drainedThreads := make([]*phpThread, 0) for _, worker := range workers { worker.threadMutex.RLock() ready.Add(len(worker.threads)) @@ -108,7 +104,7 @@ func RestartWorkers() { continue } close(thread.drainChan) - threadsToRestart = append(threadsToRestart, thread) + drainedThreads = append(drainedThreads, thread) go func(thread *phpThread) { thread.state.waitFor(stateYielding) ready.Done() @@ -116,9 +112,25 @@ func RestartWorkers() { } worker.threadMutex.RUnlock() } - ready.Wait() + return drainedThreads +} + +func drainWatcher() { + if watcherIsEnabled { + watcher.DrainWatcher() + } +} + +// RestartWorkers attempts to restart all workers gracefully +func RestartWorkers() { + // disallow scaling threads while restarting workers + scalingMu.Lock() + defer scalingMu.Unlock() + + threadsToRestart := drainWorkerThreads() + for _, thread := range threadsToRestart { thread.drainChan = make(chan struct{}) thread.state.set(stateReady)