Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 23 additions & 38 deletions threadFramework.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,49 +60,34 @@ func RegisterExternalWorker(worker WorkerExtension) {

// startExternalWorkerPipe creates a pipe from an external worker to the main worker.
func startExternalWorkerPipe(w *worker, externalWorker WorkerExtension, thread *phpThread) {
go func() {
defer func() {
if r := recover(); r != nil {
logger.LogAttrs(context.Background(), slog.LevelError, "external worker pipe panicked", slog.String("worker", w.name), slog.Any("panic", r))
}
}()
for {
rq := externalWorker.ProvideRequest()

for {
var rq *WorkerRequest
func() {
defer func() {
if r := recover(); r != nil {
logger.LogAttrs(context.Background(), slog.LevelError, "ProvideRequest panicked", slog.String("worker", w.name), slog.Any("panic", r))
rq = nil
}
}()
rq = externalWorker.ProvideRequest()
}()
if rq == nil || rq.Request == nil {
logger.LogAttrs(context.Background(), slog.LevelWarn, "external worker provided nil request", slog.String("worker", w.name), slog.Int("thread", thread.threadIndex))
continue
}

if rq == nil || rq.Request == nil {
logger.LogAttrs(context.Background(), slog.LevelWarn, "external worker provided nil request", slog.String("worker", w.name))
continue
}
r := rq.Request
fr, err := NewRequestWithContext(r, WithOriginalRequest(r), WithWorkerName(w.name))
if err != nil {
logger.LogAttrs(context.Background(), slog.LevelError, "error creating request for external worker", slog.String("worker", w.name), slog.Int("thread", thread.threadIndex), slog.Any("error", err))
continue
}

r := rq.Request
fr, err := NewRequestWithContext(r, WithOriginalRequest(r), WithWorkerName(w.name))
if err != nil {
logger.LogAttrs(context.Background(), slog.LevelError, "error creating request for external worker", slog.String("worker", w.name), slog.Any("error", err))
continue
}
if fc, ok := fromContext(fr.Context()); ok {
fc.responseWriter = rq.Response

if fc, ok := fromContext(fr.Context()); ok {
fc.responseWriter = rq.Response
// Queue the request and wait for completion if Done channel was provided
logger.LogAttrs(context.Background(), slog.LevelInfo, "queue the external worker request", slog.String("worker", w.name), slog.Int("thread", thread.threadIndex))

// Queue the request and wait for completion if Done channel was provided
w.requestChan <- fc
if rq.Done != nil {
go func() {
<-fc.done
close(rq.Done)
}()
}
w.requestChan <- fc
if rq.Done != nil {
go func() {
<-fc.done
close(rq.Done)
}()
}
}
}()
}
}
2 changes: 1 addition & 1 deletion worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func initWorkers(opt []workerOpt) error {
// create a pipe from the external worker to the main worker
// note: this is locked to the initial thread size the external worker requested
if workerThread, ok := thread.handler.(*workerThread); ok && workerThread.externalWorker != nil {
startExternalWorkerPipe(w, workerThread.externalWorker, thread)
go startExternalWorkerPipe(w, workerThread.externalWorker, thread)
}
workersReady.Done()
}()
Expand Down