Skip to content
10 changes: 6 additions & 4 deletions frankenphp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,10 +601,12 @@ func testRequestHeaders(t *testing.T, opts *testOptions) {
}

func TestFailingWorker(t *testing.T) {
runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, i int) {
body, _ := testGet("http://example.com/failing-worker.php", handler, t)
assert.Contains(t, body, "ok")
}, &testOptions{workerScript: "failing-worker.php"})
err := frankenphp.Init(
frankenphp.WithLogger(slog.New(slog.NewTextHandler(io.Discard, nil))),
frankenphp.WithWorkers("failing worker", "testdata/failing-worker.php", 4, frankenphp.WithWorkerMaxFailures(1)),
frankenphp.WithNumThreads(5),
)
assert.Error(t, err, "should return an immediate error if workers fail on startup")
}

func TestEnv(t *testing.T) {
Expand Down
59 changes: 0 additions & 59 deletions internal/backoff/backoff.go

This file was deleted.

41 changes: 0 additions & 41 deletions internal/backoff/backoff_test.go

This file was deleted.

13 changes: 6 additions & 7 deletions phpmainthread_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,19 +175,19 @@ func TestFinishBootingAWorkerScript(t *testing.T) {

func TestReturnAnErrorIf2WorkersHaveTheSameFileName(t *testing.T) {
workers = []*worker{}
w, err1 := newWorker(workerOpt{fileName: "filename.php", maxConsecutiveFailures: defaultMaxConsecutiveFailures})
w, err1 := newWorker(workerOpt{fileName: "filename.php"})
workers = append(workers, w)
_, err2 := newWorker(workerOpt{fileName: "filename.php", maxConsecutiveFailures: defaultMaxConsecutiveFailures})
_, err2 := newWorker(workerOpt{fileName: "filename.php"})

assert.NoError(t, err1)
assert.Error(t, err2, "two workers cannot have the same filename")
}

func TestReturnAnErrorIf2ModuleWorkersHaveTheSameName(t *testing.T) {
workers = []*worker{}
w, err1 := newWorker(workerOpt{fileName: "filename.php", name: "workername", maxConsecutiveFailures: defaultMaxConsecutiveFailures})
w, err1 := newWorker(workerOpt{fileName: "filename.php", name: "workername"})
workers = append(workers, w)
_, err2 := newWorker(workerOpt{fileName: "filename2.php", name: "workername", maxConsecutiveFailures: defaultMaxConsecutiveFailures})
_, err2 := newWorker(workerOpt{fileName: "filename2.php", name: "workername"})

assert.NoError(t, err1)
assert.Error(t, err2, "two workers cannot have the same name")
Expand All @@ -198,9 +198,8 @@ func getDummyWorker(fileName string) *worker {
workers = []*worker{}
}
worker, _ := newWorker(workerOpt{
fileName: testDataPath + "/" + fileName,
num: 1,
maxConsecutiveFailures: defaultMaxConsecutiveFailures,
fileName: testDataPath + "/" + fileName,
num: 1,
})
workers = append(workers, worker)
return worker
Expand Down
17 changes: 3 additions & 14 deletions testdata/failing-worker.php
Original file line number Diff line number Diff line change
@@ -1,18 +1,7 @@
<?php

$fail = random_int(1, 100) < 10;
$wait = random_int(1000 * 100, 1000 * 500); // wait 100ms - 500ms

usleep($wait);
if ($fail) {
exit(1);
if (rand(1, 100) <= 50) {
throw new Exception('this exception is expected to fail the worker');
}

while (frankenphp_handle_request(function () {
echo "ok";
})) {
$fail = random_int(1, 100) < 10;
if ($fail) {
exit(1);
}
}
// frankenphp_handle_request() has not been reached (also a failure)
41 changes: 24 additions & 17 deletions threadworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"time"
"unsafe"

"github.com/dunglas/frankenphp/internal/backoff"
"github.com/dunglas/frankenphp/internal/state"
)

Expand All @@ -23,20 +22,15 @@ type workerThread struct {
worker *worker
dummyContext *frankenPHPContext
workerContext *frankenPHPContext
backoff *backoff.ExponentialBackoff
isBootingScript bool // true if the worker has not reached frankenphp_handle_request yet
failureCount int // number of consecutive startup failures
}

func convertToWorkerThread(thread *phpThread, worker *worker) {
thread.setHandler(&workerThread{
state: thread.state,
thread: thread,
worker: worker,
backoff: &backoff.ExponentialBackoff{
MaxBackoff: 1 * time.Second,
MinBackoff: 100 * time.Millisecond,
MaxConsecutiveFailures: worker.maxConsecutiveFailures,
},
})
worker.attachThread(thread)
}
Expand Down Expand Up @@ -92,7 +86,6 @@ func (handler *workerThread) name() string {
}

func setupWorkerScript(handler *workerThread, worker *worker) {
handler.backoff.Wait()
metrics.StartWorker(worker.name)

if handler.state.Is(state.Ready) {
Expand Down Expand Up @@ -132,7 +125,6 @@ func tearDownWorkerScript(handler *workerThread, exitStatus int) {
// on exit status 0 we just run the worker script again
if exitStatus == 0 && !handler.isBootingScript {
metrics.StopWorker(worker.name, StopReasonRestart)
handler.backoff.RecordSuccess()
logger.LogAttrs(ctx, slog.LevelDebug, "restarting", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex), slog.Int("exit_status", exitStatus))

return
Expand All @@ -148,16 +140,30 @@ func tearDownWorkerScript(handler *workerThread, exitStatus int) {
return
}

logger.LogAttrs(ctx, slog.LevelError, "worker script has not reached frankenphp_handle_request()", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex))

// panic after exponential backoff if the worker has never reached frankenphp_handle_request
if handler.backoff.RecordFailure() {
if !watcherIsEnabled && !handler.state.Is(state.Ready) {
logger.LogAttrs(ctx, slog.LevelError, "too many consecutive worker failures", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex), slog.Int("failures", handler.backoff.FailureCount()))
panic("too many consecutive worker failures")
if worker.maxConsecutiveFailures >= 0 && startupFailChan != nil && !watcherIsEnabled && handler.failureCount >= worker.maxConsecutiveFailures {
select {
case startupFailChan <- fmt.Errorf("worker failure: script %s has not reached frankenphp_handle_request()", worker.fileName):
handler.thread.state.Set(state.ShuttingDown)
return
}
logger.LogAttrs(ctx, slog.LevelWarn, "many consecutive worker failures", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex), slog.Int("failures", handler.backoff.FailureCount()))
}

if watcherIsEnabled {
// worker script has probably failed due to script changes while watcher is enabled
logger.LogAttrs(ctx, slog.LevelWarn, "(watcher enabled) worker script has not reached frankenphp_handle_request()", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex))
} else {
// rare case where worker script has failed on a restart during normal operation
// this can happen if startup success depends on external resources
logger.LogAttrs(ctx, slog.LevelWarn, "worker script has failed on restart", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex))
}

// wait a bit and try again (exponential backoff)
backoffDuration := time.Duration(handler.failureCount*handler.failureCount*100) * time.Millisecond
if backoffDuration > time.Second {
backoffDuration = time.Second
}
handler.failureCount++
time.Sleep(backoffDuration)
Comment on lines +161 to +166
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The actual backoff logic is just 6 lines of code, so probably no module or library necessary

}

// waitForWorkerRequest is called during frankenphp_handle_request in the php worker script.
Expand All @@ -171,6 +177,7 @@ func (handler *workerThread) waitForWorkerRequest() (bool, any) {
// Clear the first dummy request created to initialize the worker
if handler.isBootingScript {
handler.isBootingScript = false
handler.failureCount = 0
if !C.frankenphp_shutdown_dummy_request() {
panic("Not in CGI context")
}
Expand Down
25 changes: 18 additions & 7 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,41 +31,52 @@ type worker struct {
var (
workers []*worker
watcherIsEnabled bool
startupFailChan chan (error)
)

func initWorkers(opt []workerOpt) error {
workers = make([]*worker, 0, len(opt))
workersReady := sync.WaitGroup{}
directoriesToWatch := getDirectoriesToWatch(opt)
watcherIsEnabled = len(directoriesToWatch) > 0
totalThreadsToStart := 0

for _, o := range opt {
w, err := newWorker(o)
if err != nil {
return err
}
totalThreadsToStart += w.num
workers = append(workers, w)
}

startupFailChan = make(chan error, totalThreadsToStart)
var workersReady sync.WaitGroup
for _, w := range workers {
workersReady.Add(w.num)
for i := 0; i < w.num; i++ {
thread := getInactivePHPThread()
convertToWorkerThread(thread, w)
go func() {
thread.state.WaitFor(state.Ready)
workersReady.Done()
}()
workersReady.Go(func() {
thread.state.WaitFor(state.Ready, state.ShuttingDown, state.Done)
})
}
}

workersReady.Wait()

select {
case err := <-startupFailChan:
// at least 1 worker has failed, shut down and return an error
Shutdown()
return fmt.Errorf("failed to initialize workers: %w", err)
default:
// all workers started successfully
startupFailChan = nil
}

if !watcherIsEnabled {
return nil
}

watcherIsEnabled = true
if err := watcher.InitWatcher(directoriesToWatch, RestartWorkers, logger); err != nil {
return err
}
Expand Down