From 73eb0be698600f5696da1b27f377db17cea10dcc Mon Sep 17 00:00:00 2001 From: Robert Landers Date: Thu, 7 Aug 2025 21:25:00 +0200 Subject: [PATCH 01/19] create a simple thread framework Signed-off-by: Robert Landers --- frankenphp.go | 5 +++++ threadFramework.go | 49 +++++++++++++++++++++++++++++++++++++++++ threadFramework_test.go | 1 + threadworker.go | 16 ++++++++++++++ worker.go | 29 ++++++++++++++++++++---- 5 files changed, 96 insertions(+), 4 deletions(-) create mode 100644 threadFramework.go create mode 100644 threadFramework_test.go diff --git a/frankenphp.go b/frankenphp.go index 58cd95b89..78d25308c 100644 --- a/frankenphp.go +++ b/frankenphp.go @@ -214,6 +214,11 @@ func Init(options ...Option) error { registerExtensions() + // add registered external workers + for _, ew := range externalWorkers { + options = append(options, WithWorkers(ew.Name(), ew.FileName(), ew.GetMinThreads(), WithWorkerEnv(ew.Env()))) + } + opt := &opt{} for _, o := range options { if err := o(opt); err != nil { diff --git a/threadFramework.go b/threadFramework.go new file mode 100644 index 000000000..3e278234d --- /dev/null +++ b/threadFramework.go @@ -0,0 +1,49 @@ +package frankenphp + +import ( + "net/http" + "sync" +) + +// WorkerExtension allows you to register an external worker where instead of calling frankenphp handlers on +// frankenphp_handle_request(), the ProvideRequest method is called. You are responsible for providing a standard +// http.Request that will be conferred to the underlying worker script. +// +// A worker script with the provided Name and FileName will be registered, along with the provided +// configuration. You can also provide any environment variables that you want through Env. GetMinThreads allows you to +// reserve a minimum number of threads from the frankenphp thread pool. This number must be positive. +// ProvideBackPressure allows you to autoscale your threads from the free threads in frankenphp's thread pool. These +// methods are only called once at startup, so register them in an init() function. +// +// When a thread is activated and nearly ready, ThreadActivatedNotification will be called with an opaque threadId; +// this is a time for setting up any per-thread resources. When a thread is about to be returned to the thread pool, +// you will receive a call to ThreadDrainNotification that will inform you of the threadId. +// After the thread is returned to the thread pool, ThreadDeactivatedNotification will be called. +// +// Once you have at least one thread activated, you will receive calls to ProvideRequest where you should respond with +// a request. +// +// Note: External workers receive the lowest priority when determining thread allocations. If GetMinThreads cannot be +// allocated, then frankenphp will panic and provide this information to the user (who will need to allocation more +// total threads). Don't be greedy. Use ProvideBackPressure to indicate when you receive a request to trigger +// autoscaling. +type WorkerExtension interface { + Name() string + FileName() string + Env() PreparedEnv + GetMinThreads() int + ThreadActivatedNotification(threadId int) + ThreadDrainNotification(threadId int) + ThreadDeactivatedNotification(threadId int) + ProvideRequest() <-chan *http.Request +} + +var externalWorkers = make(map[string]WorkerExtension) +var externalWorkerMutex sync.Mutex + +func RegisterExternalWorker(worker WorkerExtension) { + externalWorkerMutex.Lock() + defer externalWorkerMutex.Unlock() + + externalWorkers[worker.Name()] = worker +} diff --git a/threadFramework_test.go b/threadFramework_test.go new file mode 100644 index 000000000..7076daa03 --- /dev/null +++ b/threadFramework_test.go @@ -0,0 +1 @@ +package frankenphp diff --git a/threadworker.go b/threadworker.go index b7dc82036..33ca6453b 100644 --- a/threadworker.go +++ b/threadworker.go @@ -19,10 +19,13 @@ type workerThread struct { dummyContext *frankenPHPContext workerContext *frankenPHPContext backoff *exponentialBackoff + externalWorker WorkerExtension isBootingScript bool // true if the worker has not reached frankenphp_handle_request yet } func convertToWorkerThread(thread *phpThread, worker *worker) { + externalWorker, _ := externalWorkers[worker.name] + thread.setHandler(&workerThread{ state: thread.state, thread: thread, @@ -32,6 +35,7 @@ func convertToWorkerThread(thread *phpThread, worker *worker) { minBackoff: 100 * time.Millisecond, maxConsecutiveFailures: worker.maxConsecutiveFailures, }, + externalWorker: externalWorker, }) worker.attachThread(thread) } @@ -40,16 +44,28 @@ func convertToWorkerThread(thread *phpThread, worker *worker) { func (handler *workerThread) beforeScriptExecution() string { switch handler.state.get() { case stateTransitionRequested: + if handler.externalWorker != nil { + handler.externalWorker.ThreadDeactivatedNotification(handler.thread.threadIndex) + } handler.worker.detachThread(handler.thread) return handler.thread.transitionToNewHandler() case stateRestarting: + if handler.externalWorker != nil { + handler.externalWorker.ThreadDrainNotification(handler.thread.threadIndex) + } handler.state.set(stateYielding) handler.state.waitFor(stateReady, stateShuttingDown) return handler.beforeScriptExecution() case stateReady, stateTransitionComplete: + if handler.externalWorker != nil { + handler.externalWorker.ThreadActivatedNotification(handler.thread.threadIndex) + } setupWorkerScript(handler, handler.worker) return handler.worker.fileName case stateShuttingDown: + if handler.externalWorker != nil { + handler.externalWorker.ThreadDeactivatedNotification(handler.thread.threadIndex) + } handler.worker.detachThread(handler.thread) // signal to stop return "" diff --git a/worker.go b/worker.go index 04772fa4a..9613945e9 100644 --- a/worker.go +++ b/worker.go @@ -3,7 +3,9 @@ package frankenphp // #include "frankenphp.h" import "C" import ( + "context" "fmt" + "log/slog" "strings" "sync" "time" @@ -44,13 +46,32 @@ func initWorkers(opt []workerOpt) error { workers = append(workers, w) } - for _, worker := range workers { - workersReady.Add(worker.num) - for i := 0; i < worker.num; i++ { + for _, w := range workers { + workersReady.Add(w.num) + for i := 0; i < w.num; i++ { thread := getInactivePHPThread() - convertToWorkerThread(thread, worker) + convertToWorkerThread(thread, w) go func() { thread.state.waitFor(stateReady) + + // create a pipe from the external worker to the main worker + // note: this is locked to the initial thread size the external worker requested + if workerThread, ok := thread.handler.(*workerThread); ok && workerThread.externalWorker != nil { + go func(w *worker, externalWorker WorkerExtension) { + for { + // todo: handle shutdown + r := <-externalWorker.ProvideRequest() + fr, err := NewRequestWithContext(r, WithOriginalRequest(r), WithWorkerName(w.name)) + if err != nil { + logger.LogAttrs(context.Background(), slog.LevelError, "error creating request for external worker", slog.String("worker", w.name), slog.Any("error", err)) + continue + } + if fc, ok := fromContext(fr.Context()); ok { + w.requestChan <- fc + } + } + }(w, workerThread.externalWorker) + } workersReady.Done() }() } From 46dd89ca501a52ca972e4f6a659c1c424a5a775c Mon Sep 17 00:00:00 2001 From: Robert Landers Date: Thu, 7 Aug 2025 22:03:42 +0200 Subject: [PATCH 02/19] add tests Signed-off-by: Robert Landers --- threadFramework.go | 7 ++++++- threadFramework_test.go | 1 - worker.go | 4 +++- 3 files changed, 9 insertions(+), 3 deletions(-) delete mode 100644 threadFramework_test.go diff --git a/threadFramework.go b/threadFramework.go index 3e278234d..ec925254b 100644 --- a/threadFramework.go +++ b/threadFramework.go @@ -35,7 +35,12 @@ type WorkerExtension interface { ThreadActivatedNotification(threadId int) ThreadDrainNotification(threadId int) ThreadDeactivatedNotification(threadId int) - ProvideRequest() <-chan *http.Request + ProvideRequest() <-chan *WorkerRequest +} + +type WorkerRequest struct { + Request *http.Request + Response http.ResponseWriter } var externalWorkers = make(map[string]WorkerExtension) diff --git a/threadFramework_test.go b/threadFramework_test.go deleted file mode 100644 index 7076daa03..000000000 --- a/threadFramework_test.go +++ /dev/null @@ -1 +0,0 @@ -package frankenphp diff --git a/worker.go b/worker.go index 9613945e9..866d97a85 100644 --- a/worker.go +++ b/worker.go @@ -60,13 +60,15 @@ func initWorkers(opt []workerOpt) error { go func(w *worker, externalWorker WorkerExtension) { for { // todo: handle shutdown - r := <-externalWorker.ProvideRequest() + rq := <-externalWorker.ProvideRequest() + r := rq.Request fr, err := NewRequestWithContext(r, WithOriginalRequest(r), WithWorkerName(w.name)) if err != nil { logger.LogAttrs(context.Background(), slog.LevelError, "error creating request for external worker", slog.String("worker", w.name), slog.Any("error", err)) continue } if fc, ok := fromContext(fr.Context()); ok { + fc.responseWriter = rq.Response w.requestChan <- fc } } From fc2e183b439373e8ae62657abb44445704acca5e Mon Sep 17 00:00:00 2001 From: Robert Landers Date: Thu, 7 Aug 2025 22:24:38 +0200 Subject: [PATCH 03/19] fix comment Signed-off-by: Robert Landers --- threadFramework.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/threadFramework.go b/threadFramework.go index ec925254b..e8fdcaa38 100644 --- a/threadFramework.go +++ b/threadFramework.go @@ -25,8 +25,7 @@ import ( // // Note: External workers receive the lowest priority when determining thread allocations. If GetMinThreads cannot be // allocated, then frankenphp will panic and provide this information to the user (who will need to allocation more -// total threads). Don't be greedy. Use ProvideBackPressure to indicate when you receive a request to trigger -// autoscaling. +// total threads). Don't be greedy. type WorkerExtension interface { Name() string FileName() string From f5246304a9168b0a5b074fd4beec7ddb5ba08bff Mon Sep 17 00:00:00 2001 From: Robert Landers Date: Thu, 7 Aug 2025 22:31:45 +0200 Subject: [PATCH 04/19] remove mention of an old function that no longer exists Signed-off-by: Robert Landers --- threadFramework.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/threadFramework.go b/threadFramework.go index e8fdcaa38..c1191307b 100644 --- a/threadFramework.go +++ b/threadFramework.go @@ -12,8 +12,7 @@ import ( // A worker script with the provided Name and FileName will be registered, along with the provided // configuration. You can also provide any environment variables that you want through Env. GetMinThreads allows you to // reserve a minimum number of threads from the frankenphp thread pool. This number must be positive. -// ProvideBackPressure allows you to autoscale your threads from the free threads in frankenphp's thread pool. These -// methods are only called once at startup, so register them in an init() function. +// These methods are only called once at startup, so register them in an init() function. // // When a thread is activated and nearly ready, ThreadActivatedNotification will be called with an opaque threadId; // this is a time for setting up any per-thread resources. When a thread is about to be returned to the thread pool, From 2b81fb4a473b267b67c7a8ef1a9ba160da01eb95 Mon Sep 17 00:00:00 2001 From: Robert Landers Date: Thu, 7 Aug 2025 22:52:04 +0200 Subject: [PATCH 05/19] simplify providing a request Signed-off-by: Robert Landers --- threadFramework.go | 2 +- worker.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/threadFramework.go b/threadFramework.go index c1191307b..d687080d9 100644 --- a/threadFramework.go +++ b/threadFramework.go @@ -33,7 +33,7 @@ type WorkerExtension interface { ThreadActivatedNotification(threadId int) ThreadDrainNotification(threadId int) ThreadDeactivatedNotification(threadId int) - ProvideRequest() <-chan *WorkerRequest + ProvideRequest() *WorkerRequest } type WorkerRequest struct { diff --git a/worker.go b/worker.go index 866d97a85..2810aabef 100644 --- a/worker.go +++ b/worker.go @@ -60,7 +60,7 @@ func initWorkers(opt []workerOpt) error { go func(w *worker, externalWorker WorkerExtension) { for { // todo: handle shutdown - rq := <-externalWorker.ProvideRequest() + rq := externalWorker.ProvideRequest() r := rq.Request fr, err := NewRequestWithContext(r, WithOriginalRequest(r), WithWorkerName(w.name)) if err != nil { From 3dbd9ca25ae69f51cb362a7cb8bbb71d5c622d07 Mon Sep 17 00:00:00 2001 From: Robert Landers Date: Thu, 7 Aug 2025 22:58:34 +0200 Subject: [PATCH 06/19] satisfy linter Signed-off-by: Robert Landers --- threadworker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/threadworker.go b/threadworker.go index 33ca6453b..9da47b2b7 100644 --- a/threadworker.go +++ b/threadworker.go @@ -24,7 +24,7 @@ type workerThread struct { } func convertToWorkerThread(thread *phpThread, worker *worker) { - externalWorker, _ := externalWorkers[worker.name] + externalWorker := externalWorkers[worker.name] thread.setHandler(&workerThread{ state: thread.state, From 05c943391ec9bf93d48362e7d681867b197ee762 Mon Sep 17 00:00:00 2001 From: Robert Landers Date: Sat, 9 Aug 2025 12:18:05 +0200 Subject: [PATCH 07/19] add error handling and handle shutdowns Signed-off-by: Robert Landers --- frankenphp.go | 1 + threadFramework.go | 91 +++++++++++++++++++++++++++++++++++++++++++++- worker.go | 19 +--------- 3 files changed, 91 insertions(+), 20 deletions(-) diff --git a/frankenphp.go b/frankenphp.go index 78d25308c..604e48a5d 100644 --- a/frankenphp.go +++ b/frankenphp.go @@ -302,6 +302,7 @@ func Shutdown() { drainWatcher() drainAutoScaling() + drainExternalWorkerPipes() drainPHPThreads() metrics.Shutdown() diff --git a/threadFramework.go b/threadFramework.go index d687080d9..69a8eb316 100644 --- a/threadFramework.go +++ b/threadFramework.go @@ -1,6 +1,8 @@ package frankenphp import ( + "context" + "log/slog" "net/http" "sync" ) @@ -20,10 +22,11 @@ import ( // After the thread is returned to the thread pool, ThreadDeactivatedNotification will be called. // // Once you have at least one thread activated, you will receive calls to ProvideRequest where you should respond with -// a request. +// a request. FrankenPHP will automatically pipe these requests to the worker script and handle the response. +// The piping process is designed to run indefinitely and will be gracefully shut down when FrankenPHP shuts down. // // Note: External workers receive the lowest priority when determining thread allocations. If GetMinThreads cannot be -// allocated, then frankenphp will panic and provide this information to the user (who will need to allocation more +// allocated, then frankenphp will panic and provide this information to the user (who will need to allocate more // total threads). Don't be greedy. type WorkerExtension interface { Name() string @@ -44,9 +47,93 @@ type WorkerRequest struct { var externalWorkers = make(map[string]WorkerExtension) var externalWorkerMutex sync.Mutex +var externalWorkerPipes = make(map[string]context.CancelFunc) +var externalWorkerPipesMutex sync.Mutex + func RegisterExternalWorker(worker WorkerExtension) { externalWorkerMutex.Lock() defer externalWorkerMutex.Unlock() externalWorkers[worker.Name()] = worker } + +// startExternalWorkerPipe creates a pipe from an external worker to the main worker. +func startExternalWorkerPipe(w *worker, externalWorker WorkerExtension, thread *phpThread) { + ctx, cancel := context.WithCancel(context.Background()) + + // Register the cancel function for shutdown + externalWorkerPipesMutex.Lock() + externalWorkerPipes[w.name] = cancel + externalWorkerPipesMutex.Unlock() + + go func() { + defer func() { + if r := recover(); r != nil { + logger.LogAttrs(context.Background(), slog.LevelError, "external worker pipe panicked", slog.String("worker", w.name), slog.Any("panic", r)) + } + externalWorkerPipesMutex.Lock() + delete(externalWorkerPipes, w.name) + externalWorkerPipesMutex.Unlock() + }() + + for { + select { + case <-ctx.Done(): + logger.LogAttrs(context.Background(), slog.LevelDebug, "external worker pipe shutting down", slog.String("worker", w.name)) + return + default: + } + + var rq *WorkerRequest + func() { + defer func() { + if r := recover(); r != nil { + logger.LogAttrs(context.Background(), slog.LevelError, "ProvideRequest panicked", slog.String("worker", w.name), slog.Any("panic", r)) + rq = nil + } + }() + rq = externalWorker.ProvideRequest() + }() + + if rq == nil || rq.Request == nil { + logger.LogAttrs(context.Background(), slog.LevelWarn, "external worker provided nil request", slog.String("worker", w.name)) + continue + } + + r := rq.Request + fr, err := NewRequestWithContext(r, WithOriginalRequest(r), WithWorkerName(w.name)) + if err != nil { + logger.LogAttrs(context.Background(), slog.LevelError, "error creating request for external worker", slog.String("worker", w.name), slog.Any("error", err)) + continue + } + + if fc, ok := fromContext(fr.Context()); ok { + fc.responseWriter = rq.Response + + select { + case w.requestChan <- fc: + // Request successfully queued + case <-ctx.Done(): + fc.reject(503, "Service Unavailable") + return + } + } + } + }() +} + +// drainExternalWorkerPipes shuts down all external worker pipes gracefully +func drainExternalWorkerPipes() { + externalWorkerPipesMutex.Lock() + defer externalWorkerPipesMutex.Unlock() + + logger.LogAttrs(context.Background(), slog.LevelDebug, "shutting down external worker pipes", slog.Int("count", len(externalWorkerPipes))) + + for workerName, cancel := range externalWorkerPipes { + logger.LogAttrs(context.Background(), slog.LevelDebug, "shutting down external worker pipe", slog.String("worker", workerName)) + cancel() + } + + // Clear the map + externalWorkerPipes = make(map[string]context.CancelFunc) +} diff --git a/worker.go b/worker.go index 2810aabef..39417e657 100644 --- a/worker.go +++ b/worker.go @@ -3,9 +3,7 @@ package frankenphp // #include "frankenphp.h" import "C" import ( - "context" "fmt" - "log/slog" "strings" "sync" "time" @@ -57,22 +55,7 @@ func initWorkers(opt []workerOpt) error { // create a pipe from the external worker to the main worker // note: this is locked to the initial thread size the external worker requested if workerThread, ok := thread.handler.(*workerThread); ok && workerThread.externalWorker != nil { - go func(w *worker, externalWorker WorkerExtension) { - for { - // todo: handle shutdown - rq := externalWorker.ProvideRequest() - r := rq.Request - fr, err := NewRequestWithContext(r, WithOriginalRequest(r), WithWorkerName(w.name)) - if err != nil { - logger.LogAttrs(context.Background(), slog.LevelError, "error creating request for external worker", slog.String("worker", w.name), slog.Any("error", err)) - continue - } - if fc, ok := fromContext(fr.Context()); ok { - fc.responseWriter = rq.Response - w.requestChan <- fc - } - } - }(w, workerThread.externalWorker) + startExternalWorkerPipe(w, workerThread.externalWorker, thread) } workersReady.Done() }() From d36bc299b55fd7751cee82d2459eac66191c8071 Mon Sep 17 00:00:00 2001 From: Robert Landers Date: Sat, 9 Aug 2025 12:18:29 +0200 Subject: [PATCH 08/19] add tests Signed-off-by: Robert Landers --- threadFramework_test.go | 126 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 126 insertions(+) create mode 100644 threadFramework_test.go diff --git a/threadFramework_test.go b/threadFramework_test.go new file mode 100644 index 000000000..e4ca4ff87 --- /dev/null +++ b/threadFramework_test.go @@ -0,0 +1,126 @@ +package frankenphp_test + +import ( + "io" + "net/http/httptest" + "sync" + "testing" + "time" + + "github.com/dunglas/frankenphp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// mockWorkerExtension implements the frankenphp.WorkerExtension interface +type mockWorkerExtension struct { + name string + fileName string + env frankenphp.PreparedEnv + minThreads int + requestChan chan *frankenphp.WorkerRequest + activatedCount int + drainCount int + deactivatedCount int + mu sync.Mutex +} + +func newMockWorkerExtension(name, fileName string, minThreads int) *mockWorkerExtension { + return &mockWorkerExtension{ + name: name, + fileName: fileName, + env: make(frankenphp.PreparedEnv), + minThreads: minThreads, + requestChan: make(chan *frankenphp.WorkerRequest, 10), // Buffer to avoid blocking + } +} + +func (m *mockWorkerExtension) Name() string { + return m.name +} + +func (m *mockWorkerExtension) FileName() string { + return m.fileName +} + +func (m *mockWorkerExtension) Env() frankenphp.PreparedEnv { + return m.env +} + +func (m *mockWorkerExtension) GetMinThreads() int { + return m.minThreads +} + +func (m *mockWorkerExtension) ThreadActivatedNotification(threadId int) { + m.mu.Lock() + defer m.mu.Unlock() + m.activatedCount++ +} + +func (m *mockWorkerExtension) ThreadDrainNotification(threadId int) { + m.mu.Lock() + defer m.mu.Unlock() + m.drainCount++ +} + +func (m *mockWorkerExtension) ThreadDeactivatedNotification(threadId int) { + m.mu.Lock() + defer m.mu.Unlock() + m.deactivatedCount++ +} + +func (m *mockWorkerExtension) ProvideRequest() *frankenphp.WorkerRequest { + return <-m.requestChan +} + +func (m *mockWorkerExtension) InjectRequest(r *frankenphp.WorkerRequest) { + m.requestChan <- r +} + +func (m *mockWorkerExtension) GetActivatedCount() int { + m.mu.Lock() + defer m.mu.Unlock() + return m.activatedCount +} + +func TestWorkerExtension(t *testing.T) { + // Create a mock extension + mockExt := newMockWorkerExtension("mockWorker", "testdata/worker.php", 1) + + // Register the mock extension + frankenphp.RegisterExternalWorker(mockExt) + + // Initialize FrankenPHP with a worker that has a different name than our extension + err := frankenphp.Init() + require.NoError(t, err) + defer frankenphp.Shutdown() + + // Wait a bit for the worker to be ready + time.Sleep(100 * time.Millisecond) + + // Verify that the extension's thread was activated + assert.GreaterOrEqual(t, mockExt.GetActivatedCount(), 1, "Thread should have been activated") + + // Create a test request + req := httptest.NewRequest("GET", "http://example.com/test/?foo=bar", nil) + req.Header.Set("X-Test-Header", "test-value") + + w := httptest.NewRecorder() + + // Inject the request into the worker through the extension + mockExt.InjectRequest(&frankenphp.WorkerRequest{ + Request: req, + Response: w, + }) + + // Wait a bit for the request to be processed + time.Sleep(100 * time.Millisecond) + + // Check the response + resp := w.Result() + body, _ := io.ReadAll(resp.Body) + + // The worker.php script should output information about the request + // We're just checking that we got a response, not the specific content + assert.NotEmpty(t, body, "Response body should not be empty") +} From 1a1f34575d3e4a57ebfc092e4769a6db8e074f20 Mon Sep 17 00:00:00 2001 From: Robert Landers Date: Sat, 9 Aug 2025 13:24:29 +0200 Subject: [PATCH 09/19] pipes are tied to workers, not threads Signed-off-by: Robert Landers --- frankenphp.go | 1 - 1 file changed, 1 deletion(-) diff --git a/frankenphp.go b/frankenphp.go index 604e48a5d..78d25308c 100644 --- a/frankenphp.go +++ b/frankenphp.go @@ -302,7 +302,6 @@ func Shutdown() { drainWatcher() drainAutoScaling() - drainExternalWorkerPipes() drainPHPThreads() metrics.Shutdown() From 93f81d86d7be9dd45128e774ca7b3eedd5ba34e4 Mon Sep 17 00:00:00 2001 From: Robert Landers Date: Sat, 9 Aug 2025 13:38:06 +0200 Subject: [PATCH 10/19] fix test Signed-off-by: Robert Landers --- threadFramework_test.go | 32 ++++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/threadFramework_test.go b/threadFramework_test.go index e4ca4ff87..6ca4c0830 100644 --- a/threadFramework_test.go +++ b/threadFramework_test.go @@ -1,4 +1,4 @@ -package frankenphp_test +package frankenphp import ( "io" @@ -7,18 +7,17 @@ import ( "testing" "time" - "github.com/dunglas/frankenphp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -// mockWorkerExtension implements the frankenphp.WorkerExtension interface +// mockWorkerExtension implements the WorkerExtension interface type mockWorkerExtension struct { name string fileName string - env frankenphp.PreparedEnv + env PreparedEnv minThreads int - requestChan chan *frankenphp.WorkerRequest + requestChan chan *WorkerRequest activatedCount int drainCount int deactivatedCount int @@ -29,9 +28,9 @@ func newMockWorkerExtension(name, fileName string, minThreads int) *mockWorkerEx return &mockWorkerExtension{ name: name, fileName: fileName, - env: make(frankenphp.PreparedEnv), + env: make(PreparedEnv), minThreads: minThreads, - requestChan: make(chan *frankenphp.WorkerRequest, 10), // Buffer to avoid blocking + requestChan: make(chan *WorkerRequest, 10), // Buffer to avoid blocking } } @@ -43,7 +42,7 @@ func (m *mockWorkerExtension) FileName() string { return m.fileName } -func (m *mockWorkerExtension) Env() frankenphp.PreparedEnv { +func (m *mockWorkerExtension) Env() PreparedEnv { return m.env } @@ -69,11 +68,11 @@ func (m *mockWorkerExtension) ThreadDeactivatedNotification(threadId int) { m.deactivatedCount++ } -func (m *mockWorkerExtension) ProvideRequest() *frankenphp.WorkerRequest { +func (m *mockWorkerExtension) ProvideRequest() *WorkerRequest { return <-m.requestChan } -func (m *mockWorkerExtension) InjectRequest(r *frankenphp.WorkerRequest) { +func (m *mockWorkerExtension) InjectRequest(r *WorkerRequest) { m.requestChan <- r } @@ -88,12 +87,17 @@ func TestWorkerExtension(t *testing.T) { mockExt := newMockWorkerExtension("mockWorker", "testdata/worker.php", 1) // Register the mock extension - frankenphp.RegisterExternalWorker(mockExt) + RegisterExternalWorker(mockExt) + + // Clean up external workers after test to avoid interfering with other tests + defer func() { + delete(externalWorkers, mockExt.Name()) + }() // Initialize FrankenPHP with a worker that has a different name than our extension - err := frankenphp.Init() + err := Init() require.NoError(t, err) - defer frankenphp.Shutdown() + defer Shutdown() // Wait a bit for the worker to be ready time.Sleep(100 * time.Millisecond) @@ -108,7 +112,7 @@ func TestWorkerExtension(t *testing.T) { w := httptest.NewRecorder() // Inject the request into the worker through the extension - mockExt.InjectRequest(&frankenphp.WorkerRequest{ + mockExt.InjectRequest(&WorkerRequest{ Request: req, Response: w, }) From fe6ce1299082ebbb10179de9607fbc46aaeafcb8 Mon Sep 17 00:00:00 2001 From: Robert Landers Date: Sat, 9 Aug 2025 14:07:10 +0200 Subject: [PATCH 11/19] add a way to detect when a request is completed Signed-off-by: Robert Landers --- threadFramework.go | 12 ++++++++++++ threadFramework_test.go | 12 ++++++++---- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/threadFramework.go b/threadFramework.go index 69a8eb316..81079e81d 100644 --- a/threadFramework.go +++ b/threadFramework.go @@ -42,6 +42,8 @@ type WorkerExtension interface { type WorkerRequest struct { Request *http.Request Response http.ResponseWriter + // Done is an optional channel that will be closed when the request processing is complete + Done chan struct{} } var externalWorkers = make(map[string]WorkerExtension) @@ -113,8 +115,18 @@ func startExternalWorkerPipe(w *worker, externalWorker WorkerExtension, thread * select { case w.requestChan <- fc: // Request successfully queued + // Wait for the request to complete and signal completion if a Done channel was provided + if rq.Done != nil { + go func() { + <-fc.done + close(rq.Done) + }() + } case <-ctx.Done(): fc.reject(503, "Service Unavailable") + if rq.Done != nil { + close(rq.Done) + } return } } diff --git a/threadFramework_test.go b/threadFramework_test.go index 6ca4c0830..e85460d44 100644 --- a/threadFramework_test.go +++ b/threadFramework_test.go @@ -88,7 +88,7 @@ func TestWorkerExtension(t *testing.T) { // Register the mock extension RegisterExternalWorker(mockExt) - + // Clean up external workers after test to avoid interfering with other tests defer func() { delete(externalWorkers, mockExt.Name()) @@ -111,16 +111,20 @@ func TestWorkerExtension(t *testing.T) { w := httptest.NewRecorder() + // Create a channel to signal when the request is done + done := make(chan struct{}) + // Inject the request into the worker through the extension mockExt.InjectRequest(&WorkerRequest{ Request: req, Response: w, + Done: done, }) - // Wait a bit for the request to be processed - time.Sleep(100 * time.Millisecond) + // Wait for the request to be fully processed + <-done - // Check the response + // Check the response - now safe from race conditions resp := w.Result() body, _ := io.ReadAll(resp.Body) From 724f068642387b363cd4c71b150531db01dfe407 Mon Sep 17 00:00:00 2001 From: Robert Landers Date: Sat, 9 Aug 2025 15:13:19 +0200 Subject: [PATCH 12/19] we never shutdown workers or remove them, so we do not need this Signed-off-by: Robert Landers --- threadFramework.go | 55 +++++----------------------------------------- 1 file changed, 6 insertions(+), 49 deletions(-) diff --git a/threadFramework.go b/threadFramework.go index 81079e81d..2656cf95b 100644 --- a/threadFramework.go +++ b/threadFramework.go @@ -49,8 +49,6 @@ type WorkerRequest struct { var externalWorkers = make(map[string]WorkerExtension) var externalWorkerMutex sync.Mutex -var externalWorkerPipes = make(map[string]context.CancelFunc) -var externalWorkerPipesMutex sync.Mutex func RegisterExternalWorker(worker WorkerExtension) { externalWorkerMutex.Lock() @@ -61,31 +59,14 @@ func RegisterExternalWorker(worker WorkerExtension) { // startExternalWorkerPipe creates a pipe from an external worker to the main worker. func startExternalWorkerPipe(w *worker, externalWorker WorkerExtension, thread *phpThread) { - ctx, cancel := context.WithCancel(context.Background()) - - // Register the cancel function for shutdown - externalWorkerPipesMutex.Lock() - externalWorkerPipes[w.name] = cancel - externalWorkerPipesMutex.Unlock() - go func() { defer func() { if r := recover(); r != nil { logger.LogAttrs(context.Background(), slog.LevelError, "external worker pipe panicked", slog.String("worker", w.name), slog.Any("panic", r)) } - externalWorkerPipesMutex.Lock() - delete(externalWorkerPipes, w.name) - externalWorkerPipesMutex.Unlock() }() for { - select { - case <-ctx.Done(): - logger.LogAttrs(context.Background(), slog.LevelDebug, "external worker pipe shutting down", slog.String("worker", w.name)) - return - default: - } - var rq *WorkerRequest func() { defer func() { @@ -112,40 +93,16 @@ func startExternalWorkerPipe(w *worker, externalWorker WorkerExtension, thread * if fc, ok := fromContext(fr.Context()); ok { fc.responseWriter = rq.Response - select { - case w.requestChan <- fc: - // Request successfully queued - // Wait for the request to complete and signal completion if a Done channel was provided - if rq.Done != nil { - go func() { - <-fc.done - close(rq.Done) - }() - } - case <-ctx.Done(): - fc.reject(503, "Service Unavailable") - if rq.Done != nil { + // Queue the request and wait for completion if Done channel was provided + w.requestChan <- fc + if rq.Done != nil { + go func() { + <-fc.done close(rq.Done) - } - return + }() } } } }() } -// drainExternalWorkerPipes shuts down all external worker pipes gracefully -func drainExternalWorkerPipes() { - externalWorkerPipesMutex.Lock() - defer externalWorkerPipesMutex.Unlock() - - logger.LogAttrs(context.Background(), slog.LevelDebug, "shutting down external worker pipes", slog.Int("count", len(externalWorkerPipes))) - - for workerName, cancel := range externalWorkerPipes { - logger.LogAttrs(context.Background(), slog.LevelDebug, "shutting down external worker pipe", slog.String("worker", workerName)) - cancel() - } - - // Clear the map - externalWorkerPipes = make(map[string]context.CancelFunc) -} From cf71f1d39cdddbb4f3c77731581f8b04b31aa72f Mon Sep 17 00:00:00 2001 From: Robert Landers Date: Sat, 9 Aug 2025 15:15:38 +0200 Subject: [PATCH 13/19] add more comments Signed-off-by: Robert Landers --- threadFramework.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/threadFramework.go b/threadFramework.go index 2656cf95b..6a3315a6f 100644 --- a/threadFramework.go +++ b/threadFramework.go @@ -40,7 +40,9 @@ type WorkerExtension interface { } type WorkerRequest struct { - Request *http.Request + // The request for your worker script to handle + Request *http.Request + // Response is a response writer that provides the output of the provided request Response http.ResponseWriter // Done is an optional channel that will be closed when the request processing is complete Done chan struct{} @@ -49,7 +51,6 @@ type WorkerRequest struct { var externalWorkers = make(map[string]WorkerExtension) var externalWorkerMutex sync.Mutex - func RegisterExternalWorker(worker WorkerExtension) { externalWorkerMutex.Lock() defer externalWorkerMutex.Unlock() @@ -105,4 +106,3 @@ func startExternalWorkerPipe(w *worker, externalWorker WorkerExtension, thread * } }() } - From 2bfac92bd8289c70fe08ce88f60941c7d41c8034 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?K=C3=A9vin=20Dunglas?= Date: Sat, 13 Sep 2025 16:09:57 +0200 Subject: [PATCH 14/19] Simplify modular threads (#1874) * Simplify * remove unused variable * log thread index --- threadFramework.go | 61 +++++++++++++++++----------------------------- worker.go | 2 +- 2 files changed, 24 insertions(+), 39 deletions(-) diff --git a/threadFramework.go b/threadFramework.go index 6a3315a6f..07badf8bc 100644 --- a/threadFramework.go +++ b/threadFramework.go @@ -60,49 +60,34 @@ func RegisterExternalWorker(worker WorkerExtension) { // startExternalWorkerPipe creates a pipe from an external worker to the main worker. func startExternalWorkerPipe(w *worker, externalWorker WorkerExtension, thread *phpThread) { - go func() { - defer func() { - if r := recover(); r != nil { - logger.LogAttrs(context.Background(), slog.LevelError, "external worker pipe panicked", slog.String("worker", w.name), slog.Any("panic", r)) - } - }() + for { + rq := externalWorker.ProvideRequest() - for { - var rq *WorkerRequest - func() { - defer func() { - if r := recover(); r != nil { - logger.LogAttrs(context.Background(), slog.LevelError, "ProvideRequest panicked", slog.String("worker", w.name), slog.Any("panic", r)) - rq = nil - } - }() - rq = externalWorker.ProvideRequest() - }() + if rq == nil || rq.Request == nil { + logger.LogAttrs(context.Background(), slog.LevelWarn, "external worker provided nil request", slog.String("worker", w.name), slog.Int("thread", thread.threadIndex)) + continue + } - if rq == nil || rq.Request == nil { - logger.LogAttrs(context.Background(), slog.LevelWarn, "external worker provided nil request", slog.String("worker", w.name)) - continue - } + r := rq.Request + fr, err := NewRequestWithContext(r, WithOriginalRequest(r), WithWorkerName(w.name)) + if err != nil { + logger.LogAttrs(context.Background(), slog.LevelError, "error creating request for external worker", slog.String("worker", w.name), slog.Int("thread", thread.threadIndex), slog.Any("error", err)) + continue + } - r := rq.Request - fr, err := NewRequestWithContext(r, WithOriginalRequest(r), WithWorkerName(w.name)) - if err != nil { - logger.LogAttrs(context.Background(), slog.LevelError, "error creating request for external worker", slog.String("worker", w.name), slog.Any("error", err)) - continue - } + if fc, ok := fromContext(fr.Context()); ok { + fc.responseWriter = rq.Response - if fc, ok := fromContext(fr.Context()); ok { - fc.responseWriter = rq.Response + // Queue the request and wait for completion if Done channel was provided + logger.LogAttrs(context.Background(), slog.LevelInfo, "queue the external worker request", slog.String("worker", w.name), slog.Int("thread", thread.threadIndex)) - // Queue the request and wait for completion if Done channel was provided - w.requestChan <- fc - if rq.Done != nil { - go func() { - <-fc.done - close(rq.Done) - }() - } + w.requestChan <- fc + if rq.Done != nil { + go func() { + <-fc.done + close(rq.Done) + }() } } - }() + } } diff --git a/worker.go b/worker.go index 39417e657..dcc07444d 100644 --- a/worker.go +++ b/worker.go @@ -55,7 +55,7 @@ func initWorkers(opt []workerOpt) error { // create a pipe from the external worker to the main worker // note: this is locked to the initial thread size the external worker requested if workerThread, ok := thread.handler.(*workerThread); ok && workerThread.externalWorker != nil { - startExternalWorkerPipe(w, workerThread.externalWorker, thread) + go startExternalWorkerPipe(w, workerThread.externalWorker, thread) } workersReady.Done() }() From eab91947a3f3565e0beddaf72edc63dadc0b7f9e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?K=C3=A9vin=20Dunglas?= Date: Wed, 17 Sep 2025 11:51:11 +0200 Subject: [PATCH 15/19] feat: allow passing parameters to the PHP callback and accessing its return value (#1881) --- context.go | 4 +++- frankenphp.c | 22 +++++++++++++++++----- threadFramework.go | 24 ++++++++++++++++-------- threadworker.go | 23 +++++++++++++++++------ 4 files changed, 53 insertions(+), 20 deletions(-) diff --git a/context.go b/context.go index 65aee5b75..2e897cd5c 100644 --- a/context.go +++ b/context.go @@ -28,7 +28,9 @@ type frankenPHPContext struct { // Whether the request is already closed by us isDone bool - responseWriter http.ResponseWriter + responseWriter http.ResponseWriter + handlerParameters any + handlerReturn any done chan any startedAt time.Time diff --git a/frankenphp.c b/frankenphp.c index 3e516ffd4..165281f42 100644 --- a/frankenphp.c +++ b/frankenphp.c @@ -432,10 +432,11 @@ PHP_FUNCTION(frankenphp_handle_request) { zend_unset_timeout(); #endif - bool has_request = go_frankenphp_worker_handle_request_start(thread_index); + struct go_frankenphp_worker_handle_request_start_return result = + go_frankenphp_worker_handle_request_start(thread_index); if (frankenphp_worker_request_startup() == FAILURE /* Shutting down */ - || !has_request) { + || !result.r0) { RETURN_FALSE; } @@ -450,10 +451,15 @@ PHP_FUNCTION(frankenphp_handle_request) { /* Call the PHP func passed to frankenphp_handle_request() */ zval retval = {0}; + zval *callback_ret = NULL; + fci.size = sizeof fci; fci.retval = &retval; - if (zend_call_function(&fci, &fcc) == SUCCESS) { - zval_ptr_dtor(&retval); + fci.params = result.r1; + fci.param_count = 1; + + if (zend_call_function(&fci, &fcc) == SUCCESS && Z_TYPE(retval) != IS_UNDEF) { + callback_ret = &retval; } /* @@ -467,7 +473,13 @@ PHP_FUNCTION(frankenphp_handle_request) { } frankenphp_worker_request_shutdown(); - go_frankenphp_finish_worker_request(thread_index); + go_frankenphp_finish_worker_request(thread_index, callback_ret); + if (result.r1 != NULL) { + zval_ptr_dtor(result.r1); + } + if (callback_ret != NULL) { + zval_ptr_dtor(&retval); + } RETURN_TRUE; } diff --git a/threadFramework.go b/threadFramework.go index 07badf8bc..a18996c7e 100644 --- a/threadFramework.go +++ b/threadFramework.go @@ -7,7 +7,7 @@ import ( "sync" ) -// WorkerExtension allows you to register an external worker where instead of calling frankenphp handlers on +// EXPERIMENTAL: WorkerExtension allows you to register an external worker where instead of calling frankenphp handlers on // frankenphp_handle_request(), the ProvideRequest method is called. You are responsible for providing a standard // http.Request that will be conferred to the underlying worker script. // @@ -36,21 +36,25 @@ type WorkerExtension interface { ThreadActivatedNotification(threadId int) ThreadDrainNotification(threadId int) ThreadDeactivatedNotification(threadId int) - ProvideRequest() *WorkerRequest + ProvideRequest() *WorkerRequest[any, any] } -type WorkerRequest struct { +// EXPERIMENTAL +type WorkerRequest[P any, R any] struct { // The request for your worker script to handle Request *http.Request - // Response is a response writer that provides the output of the provided request + // Response is a response writer that provides the output of the provided request, it must not be nil to access the request body Response http.ResponseWriter - // Done is an optional channel that will be closed when the request processing is complete - Done chan struct{} + // CallbackParameters is an optional field that will be converted in PHP types and passed as parameter to the PHP callback + CallbackParameters P + // AfterFunc is an optional function that will be called after the request is processed with the original value, the return of the PHP callback, converted in Go types, is passed as parameter + AfterFunc func(callbackReturn R) } var externalWorkers = make(map[string]WorkerExtension) var externalWorkerMutex sync.Mutex +// EXPERIMENTAL func RegisterExternalWorker(worker WorkerExtension) { externalWorkerMutex.Lock() defer externalWorkerMutex.Unlock() @@ -77,15 +81,19 @@ func startExternalWorkerPipe(w *worker, externalWorker WorkerExtension, thread * if fc, ok := fromContext(fr.Context()); ok { fc.responseWriter = rq.Response + fc.handlerParameters = rq.CallbackParameters // Queue the request and wait for completion if Done channel was provided logger.LogAttrs(context.Background(), slog.LevelInfo, "queue the external worker request", slog.String("worker", w.name), slog.Int("thread", thread.threadIndex)) w.requestChan <- fc - if rq.Done != nil { + if rq.AfterFunc != nil { go func() { <-fc.done - close(rq.Done) + + if rq.AfterFunc != nil { + rq.AfterFunc(fc.handlerReturn) + } }() } } diff --git a/threadworker.go b/threadworker.go index 9da47b2b7..ebe7fa64d 100644 --- a/threadworker.go +++ b/threadworker.go @@ -7,6 +7,7 @@ import ( "log/slog" "path/filepath" "time" + "unsafe" ) // representation of a thread assigned to a worker script @@ -159,7 +160,7 @@ func tearDownWorkerScript(handler *workerThread, exitStatus int) { } // waitForWorkerRequest is called during frankenphp_handle_request in the php worker script. -func (handler *workerThread) waitForWorkerRequest() bool { +func (handler *workerThread) waitForWorkerRequest() (bool, any) { // unpin any memory left over from previous requests handler.thread.Unpin() @@ -195,7 +196,7 @@ func (handler *workerThread) waitForWorkerRequest() bool { C.frankenphp_reset_opcache() } - return false + return false, nil case fc = <-handler.thread.requestChan: case fc = <-handler.worker.requestChan: } @@ -205,23 +206,33 @@ func (handler *workerThread) waitForWorkerRequest() bool { logger.LogAttrs(ctx, slog.LevelDebug, "request handling started", slog.String("worker", handler.worker.name), slog.Int("thread", handler.thread.threadIndex), slog.String("url", fc.request.RequestURI)) - return true + return true, fc.handlerParameters } // go_frankenphp_worker_handle_request_start is called at the start of every php request served. // //export go_frankenphp_worker_handle_request_start -func go_frankenphp_worker_handle_request_start(threadIndex C.uintptr_t) C.bool { +func go_frankenphp_worker_handle_request_start(threadIndex C.uintptr_t) (C.bool, unsafe.Pointer) { handler := phpThreads[threadIndex].handler.(*workerThread) - return C.bool(handler.waitForWorkerRequest()) + hasRequest, parameters := handler.waitForWorkerRequest() + + if parameters != nil { + p := PHPValue(parameters) + handler.thread.Pin(p) + + return C.bool(hasRequest), p + } + + return C.bool(hasRequest), nil } // go_frankenphp_finish_worker_request is called at the end of every php request served. // //export go_frankenphp_finish_worker_request -func go_frankenphp_finish_worker_request(threadIndex C.uintptr_t) { +func go_frankenphp_finish_worker_request(threadIndex C.uintptr_t, retval *C.zval) { thread := phpThreads[threadIndex] fc := thread.getRequestContext() + fc.handlerReturn = GoValue(unsafe.Pointer(retval)) fc.closeContext() thread.handler.(*workerThread).workerContext = nil From 712376f514a00c5c1f3023a08edc0a8e6426440e Mon Sep 17 00:00:00 2001 From: Robert Landers Date: Wed, 17 Sep 2025 12:28:15 +0200 Subject: [PATCH 16/19] fix formatting Signed-off-by: Robert Landers --- frankenphp.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/frankenphp.c b/frankenphp.c index 165281f42..7ac7a74e5 100644 --- a/frankenphp.c +++ b/frankenphp.c @@ -475,7 +475,7 @@ PHP_FUNCTION(frankenphp_handle_request) { frankenphp_worker_request_shutdown(); go_frankenphp_finish_worker_request(thread_index, callback_ret); if (result.r1 != NULL) { - zval_ptr_dtor(result.r1); + zval_ptr_dtor(result.r1); } if (callback_ret != NULL) { zval_ptr_dtor(&retval); From 222d8c91c4de130ab31bba83bd354601777af9a5 Mon Sep 17 00:00:00 2001 From: Robert Landers Date: Wed, 17 Sep 2025 12:44:27 +0200 Subject: [PATCH 17/19] fix test compilation Signed-off-by: Robert Landers --- threadFramework_test.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/threadFramework_test.go b/threadFramework_test.go index e85460d44..7519d9abf 100644 --- a/threadFramework_test.go +++ b/threadFramework_test.go @@ -17,7 +17,7 @@ type mockWorkerExtension struct { fileName string env PreparedEnv minThreads int - requestChan chan *WorkerRequest + requestChan chan *WorkerRequest[any, any] activatedCount int drainCount int deactivatedCount int @@ -30,7 +30,7 @@ func newMockWorkerExtension(name, fileName string, minThreads int) *mockWorkerEx fileName: fileName, env: make(PreparedEnv), minThreads: minThreads, - requestChan: make(chan *WorkerRequest, 10), // Buffer to avoid blocking + requestChan: make(chan *WorkerRequest[any, any], 10), // Buffer to avoid blocking } } @@ -68,11 +68,11 @@ func (m *mockWorkerExtension) ThreadDeactivatedNotification(threadId int) { m.deactivatedCount++ } -func (m *mockWorkerExtension) ProvideRequest() *WorkerRequest { +func (m *mockWorkerExtension) ProvideRequest() *WorkerRequest[any, any] { return <-m.requestChan } -func (m *mockWorkerExtension) InjectRequest(r *WorkerRequest) { +func (m *mockWorkerExtension) InjectRequest(r *WorkerRequest[any, any]) { m.requestChan <- r } @@ -115,10 +115,12 @@ func TestWorkerExtension(t *testing.T) { done := make(chan struct{}) // Inject the request into the worker through the extension - mockExt.InjectRequest(&WorkerRequest{ + mockExt.InjectRequest(&WorkerRequest[any, any]{ Request: req, Response: w, - Done: done, + AfterFunc: func(callbackReturn any) { + close(done) + }, }) // Wait for the request to be fully processed From 60591b82eda4e75421aaa5449876b0634c729fd8 Mon Sep 17 00:00:00 2001 From: Robert Landers Date: Wed, 17 Sep 2025 13:01:32 +0200 Subject: [PATCH 18/19] fix segfaults Signed-off-by: Robert Landers --- frankenphp.c | 2 +- threadworker.go | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/frankenphp.c b/frankenphp.c index 7ac7a74e5..5986320c8 100644 --- a/frankenphp.c +++ b/frankenphp.c @@ -456,7 +456,7 @@ PHP_FUNCTION(frankenphp_handle_request) { fci.size = sizeof fci; fci.retval = &retval; fci.params = result.r1; - fci.param_count = 1; + fci.param_count = result.r1 != NULL ? 1 : 0; if (zend_call_function(&fci, &fcc) == SUCCESS && Z_TYPE(retval) != IS_UNDEF) { callback_ret = &retval; diff --git a/threadworker.go b/threadworker.go index ebe7fa64d..745b8f882 100644 --- a/threadworker.go +++ b/threadworker.go @@ -232,7 +232,9 @@ func go_frankenphp_worker_handle_request_start(threadIndex C.uintptr_t) (C.bool, func go_frankenphp_finish_worker_request(threadIndex C.uintptr_t, retval *C.zval) { thread := phpThreads[threadIndex] fc := thread.getRequestContext() - fc.handlerReturn = GoValue(unsafe.Pointer(retval)) + if retval != nil { + fc.handlerReturn = GoValue(unsafe.Pointer(retval)) + } fc.closeContext() thread.handler.(*workerThread).workerContext = nil From ad15340d21284a5f42296409e449f9cc85d8ad2d Mon Sep 17 00:00:00 2001 From: Rob Landers Date: Wed, 17 Sep 2025 19:54:09 +0200 Subject: [PATCH 19/19] Update frankenphp.c MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Kévin Dunglas --- frankenphp.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/frankenphp.c b/frankenphp.c index 5986320c8..6b8438043 100644 --- a/frankenphp.c +++ b/frankenphp.c @@ -456,7 +456,7 @@ PHP_FUNCTION(frankenphp_handle_request) { fci.size = sizeof fci; fci.retval = &retval; fci.params = result.r1; - fci.param_count = result.r1 != NULL ? 1 : 0; + fci.param_count = result.r1 == NULL ? 0 : 1; if (zend_call_function(&fci, &fcc) == SUCCESS && Z_TYPE(retval) != IS_UNDEF) { callback_ret = &retval;