From 90bab574717adc66c96eb96f84b3302350c80290 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?K=C3=A9vin=20Dunglas?= Date: Fri, 10 Oct 2025 18:16:50 +0200 Subject: [PATCH 1/2] refactor: improve Worker public API and docs --- frankenphp.go | 2 +- threadworker.go | 8 ++--- workerextension.go | 79 ++++++++++++++++++++++++----------------- workerextension_test.go | 7 +++- 4 files changed, 57 insertions(+), 39 deletions(-) diff --git a/frankenphp.go b/frankenphp.go index 222bc440e..c801e7a95 100644 --- a/frankenphp.go +++ b/frankenphp.go @@ -216,7 +216,7 @@ func Init(options ...Option) error { // add registered external workers for _, ew := range extensionWorkers { - options = append(options, WithWorkers(ew.Name(), ew.FileName(), ew.GetMinThreads(), WithWorkerEnv(ew.Env()))) + options = append(options, WithWorkers(ew.Name(), ew.FileName(), ew.MinThreads(), WithWorkerEnv(ew.Env()))) } opt := &opt{} diff --git a/threadworker.go b/threadworker.go index fabcc7c27..c3fbccd40 100644 --- a/threadworker.go +++ b/threadworker.go @@ -46,26 +46,26 @@ func (handler *workerThread) beforeScriptExecution() string { switch handler.state.get() { case stateTransitionRequested: if handler.externalWorker != nil { - handler.externalWorker.ThreadDeactivatedNotification(handler.thread.threadIndex) + handler.externalWorker.OnServerShutdown(handler.thread.threadIndex) } handler.worker.detachThread(handler.thread) return handler.thread.transitionToNewHandler() case stateRestarting: if handler.externalWorker != nil { - handler.externalWorker.ThreadDrainNotification(handler.thread.threadIndex) + handler.externalWorker.OnShutdown(handler.thread.threadIndex) } handler.state.set(stateYielding) handler.state.waitFor(stateReady, stateShuttingDown) return handler.beforeScriptExecution() case stateReady, stateTransitionComplete: if handler.externalWorker != nil { - handler.externalWorker.ThreadActivatedNotification(handler.thread.threadIndex) + handler.externalWorker.OnReady(handler.thread.threadIndex) } setupWorkerScript(handler, handler.worker) return handler.worker.fileName case stateShuttingDown: if handler.externalWorker != nil { - handler.externalWorker.ThreadDeactivatedNotification(handler.thread.threadIndex) + handler.externalWorker.OnServerShutdown(handler.thread.threadIndex) } handler.worker.detachThread(handler.thread) // signal to stop diff --git a/workerextension.go b/workerextension.go index 4e7c29d5f..a343e4dd8 100644 --- a/workerextension.go +++ b/workerextension.go @@ -8,46 +8,57 @@ import ( "sync/atomic" ) -// EXPERIMENTAL: Worker allows you to register a worker where instead of calling FrankenPHP handlers on -// frankenphp_handle_request(), the ProvideRequest method is called. You may provide a standard -// http.Request that will be conferred to the underlying worker script. +// EXPERIMENTAL: Worker allows you to register a worker where, instead of calling FrankenPHP handlers on +// frankenphp_handle_request(), the GetRequest method is called. +// +// You may provide an http.Request that will be conferred to the underlying worker script, +// or custom parameters that will be passed to frankenphp_handle_request(). +// +// After the execution of frankenphp_handle_request(), the return value WorkerRequest.AfterFunc will be called, +// with the optional return value of the callback passed as parameter. // // A worker script with the provided Name and FileName will be registered, along with the provided -// configuration. You can also provide any environment variables that you want through Env. GetMinThreads allows you to -// reserve a minimum number of threads from the frankenphp thread pool. This number must be positive. -// These methods are only called once at startup, so register them in an init() function. +// configuration. You can also provide any environment variables that you want through Env. // -// When a thread is activated and nearly ready, ThreadActivatedNotification will be called with an opaque threadId; -// this is a time for setting up any per-thread resources. When a thread is about to be returned to the thread pool, -// you will receive a call to ThreadDrainNotification that will inform you of the threadId. -// After the thread is returned to the thread pool, ThreadDeactivatedNotification will be called. +// Name() and FileName() are only called once at startup, so register them in an init() function. // -// Once you have at least one thread activated, you will receive calls to ProvideRequest where you should respond with -// a request. FrankenPHP will automatically pipe these requests to the worker script and handle the response. -// The piping process is designed to run indefinitely and will be gracefully shut down when FrankenPHP shuts down. +// Workers are designed to run indefinitely and will be gracefully shut down when FrankenPHP shuts down. // -// Note: External workers receive the lowest priority when determining thread allocations. If GetMinThreads cannot be -// allocated, then frankenphp will panic and provide this information to the user (who will need to allocate more +// Extension workers receive the lowest priority when determining thread allocations. If MinThreads cannot be +// allocated, then FrankenPHP will panic and provide this information to the user (who will need to allocate more // total threads). Don't be greedy. type Worker interface { + // Name returns the worker name Name() string + // FileName returns the PHP script filename FileName() string + // Env returns the environment variables available in the worker script. Env() PreparedEnv - GetMinThreads() int - ThreadActivatedNotification(threadId int) - ThreadDrainNotification(threadId int) - ThreadDeactivatedNotification(threadId int) - ProvideRequest() *WorkerRequest - InjectRequest(r *WorkerRequest) + // MinThreads returns the minimum number of threads to reserve from the FrankenPHP thread pool. + // This number must be positive. + MinThreads() int + // OnReady is called when the worker is assigned to a thread and receives an opaque thread ID as parameter. + // This is a time for setting up any per-thread resources. + OnReady(threadId int) + // OnShutdown is called when the worker is shutting down and receives an opaque thread ID as parameter. + // This is a time for cleaning up any per-thread resources. + OnShutdown(threadId int) + // OnServerShutdown is called when FrankenPHP is shutting down. + OnServerShutdown(threadId int) + // GetRequest is called once at least one thread is ready. + // The returned request will be passed to the worker script. + GetRequest() *WorkerRequest + // SendRequest sends a request to the worker script. The callback function of frankenphp_handle_request() will be called. + SendRequest(r *WorkerRequest) } -// EXPERIMENTAL +// EXPERIMENTAL: WorkerRequest represents a request to pass to a worker script. type WorkerRequest struct { - // The request for your worker script to handle + // Request is an optional HTTP request for your worker script to handle Request *http.Request - // Response is a response writer that provides the output of the provided request, it must not be nil to access the request body + // Response is an optional response writer that provides the output of the provided request, it must not be nil to access the request body Response http.ResponseWriter - // CallbackParameters is an optional field that will be converted in PHP types and passed as parameter to the PHP callback + // CallbackParameters is an optional field that will be converted in PHP types or left as-is if it's an unsafe.Pointer and passed as parameter to the PHP callback CallbackParameters any // AfterFunc is an optional function that will be called after the request is processed with the original value, the return of the PHP callback, converted in Go types, is passed as parameter AfterFunc func(callbackReturn any) @@ -56,7 +67,7 @@ type WorkerRequest struct { var extensionWorkers = make(map[string]Worker) var extensionWorkersMutex sync.Mutex -// EXPERIMENTAL +// EXPERIMENTAL: RegisterWorker registers a custom worker script. func RegisterWorker(worker Worker) { extensionWorkersMutex.Lock() defer extensionWorkersMutex.Unlock() @@ -67,7 +78,7 @@ func RegisterWorker(worker Worker) { // startWorker creates a pipe from a worker to the main worker. func startWorker(w *worker, extensionWorker Worker, thread *phpThread) { for { - rq := extensionWorker.ProvideRequest() + rq := extensionWorker.GetRequest() var fc *frankenPHPContext if rq.Request == nil { @@ -107,6 +118,8 @@ func startWorker(w *worker, extensionWorker Worker, thread *phpThread) { } } +// EXPERIMENTAL: NewWorker creates a Worker instance to embed in a custom struct implementing the Worker interface. +// The returned instance may be sufficient on its own for simple use cases. func NewWorker(name, fileName string, minThreads int, env PreparedEnv) Worker { return &defaultWorker{ name: name, @@ -141,27 +154,27 @@ func (w *defaultWorker) Env() PreparedEnv { return w.env } -func (w *defaultWorker) GetMinThreads() int { +func (w *defaultWorker) MinThreads() int { return w.minThreads } -func (w *defaultWorker) ThreadActivatedNotification(_ int) { +func (w *defaultWorker) OnReady(_ int) { w.activatedCount.Add(1) } -func (w *defaultWorker) ThreadDrainNotification(_ int) { +func (w *defaultWorker) OnShutdown(_ int) { w.drainCount.Add(1) } -func (w *defaultWorker) ThreadDeactivatedNotification(_ int) { +func (w *defaultWorker) OnServerShutdown(_ int) { w.drainCount.Add(-1) w.activatedCount.Add(-1) } -func (w *defaultWorker) ProvideRequest() *WorkerRequest { +func (w *defaultWorker) GetRequest() *WorkerRequest { return <-w.requestChan } -func (w *defaultWorker) InjectRequest(r *WorkerRequest) { +func (w *defaultWorker) SendRequest(r *WorkerRequest) { w.requestChan <- r } diff --git a/workerextension_test.go b/workerextension_test.go index 2a900f6ed..d91f2b6ed 100644 --- a/workerextension_test.go +++ b/workerextension_test.go @@ -15,6 +15,11 @@ type mockWorker struct { Worker } +func (*mockWorker) OnShutdown(threadId int) { + //TODO implement me + panic("implement me") +} + func TestWorkerExtension(t *testing.T) { // Create a mock worker extension mockExt := &mockWorker{ @@ -50,7 +55,7 @@ func TestWorkerExtension(t *testing.T) { done := make(chan struct{}) // Inject the request into the worker through the extension - mockExt.InjectRequest(&WorkerRequest{ + mockExt.SendRequest(&WorkerRequest{ Request: req, Response: w, AfterFunc: func(callbackReturn any) { From 31192aed2b5e06c39f36b983ea773def397b5d6a Mon Sep 17 00:00:00 2001 From: Alexander Stecher <45872305+AlliBalliBaba@users.noreply.github.com> Date: Tue, 28 Oct 2025 20:37:20 +0100 Subject: [PATCH 2/2] suggestion: external worker api (#1928) * Cleaner request apis. --- frankenphp.go | 21 +++- options.go | 38 +++++++ testdata/message-worker.php | 8 ++ threadworker.go | 20 ++-- worker.go | 10 +- workerextension.go | 200 ++++++++++++------------------------ workerextension_test.go | 122 +++++++++++++--------- 7 files changed, 217 insertions(+), 202 deletions(-) create mode 100644 testdata/message-worker.php diff --git a/frankenphp.go b/frankenphp.go index c801e7a95..a5fbfc0ca 100644 --- a/frankenphp.go +++ b/frankenphp.go @@ -52,7 +52,8 @@ var ( ErrScriptExecution = errors.New("error during PHP script execution") ErrNotRunning = errors.New("FrankenPHP is not running. For proper configuration visit: https://frankenphp.dev/docs/config/#caddyfile-config") - isRunning bool + isRunning bool + onServerShutdown []func() loggerMu sync.RWMutex logger *slog.Logger @@ -216,7 +217,7 @@ func Init(options ...Option) error { // add registered external workers for _, ew := range extensionWorkers { - options = append(options, WithWorkers(ew.Name(), ew.FileName(), ew.MinThreads(), WithWorkerEnv(ew.Env()))) + options = append(options, WithWorkers(ew.name, ew.fileName, ew.num, ew.options...)) } opt := &opt{} @@ -291,6 +292,17 @@ func Init(options ...Option) error { logger.LogAttrs(ctx, slog.LevelInfo, "embedded PHP app 📦", slog.String("path", EmbeddedAppPath)) } + // register the startup/shutdown hooks (mainly useful for extensions) + onServerShutdown = nil + for _, w := range opt.workers { + if w.onServerStartup != nil { + w.onServerStartup() + } + if w.onServerShutdown != nil { + onServerShutdown = append(onServerShutdown, w.onServerShutdown) + } + } + return nil } @@ -300,6 +312,11 @@ func Shutdown() { return } + // call the shutdown hooks (mainly useful for extensions) + for _, fn := range onServerShutdown { + fn() + } + drainWatcher() drainAutoScaling() drainPHPThreads() diff --git a/options.go b/options.go index 18c5ba20f..befe3a7fb 100644 --- a/options.go +++ b/options.go @@ -35,6 +35,10 @@ type workerOpt struct { env PreparedEnv watch []string maxConsecutiveFailures int + onThreadReady func(int) + onThreadShutdown func(int) + onServerStartup func() + onServerShutdown func() } // WithNumThreads configures the number of PHP threads to start. @@ -116,6 +120,40 @@ func WithWorkerMaxFailures(maxFailures int) WorkerOption { } } +func WithWorkerOnReady(f func(int)) WorkerOption { + return func(w *workerOpt) error { + w.onThreadReady = f + + return nil + } +} + +func WithWorkerOnShutdown(f func(int)) WorkerOption { + return func(w *workerOpt) error { + w.onThreadShutdown = f + + return nil + } +} + +// WithWorkerOnServerStartup adds a function to be called right after server startup. Useful for extensions. +func WithWorkerOnServerStartup(f func()) WorkerOption { + return func(w *workerOpt) error { + w.onServerStartup = f + + return nil + } +} + +// WithWorkerOnServerShutdown adds a function to be called right before server shutdown. Useful for extensions. +func WithWorkerOnServerShutdown(f func()) WorkerOption { + return func(w *workerOpt) error { + w.onServerShutdown = f + + return nil + } +} + // WithLogger configures the global logger to use. func WithLogger(l *slog.Logger) Option { return func(o *opt) error { diff --git a/testdata/message-worker.php b/testdata/message-worker.php new file mode 100644 index 000000000..a73f9fa64 --- /dev/null +++ b/testdata/message-worker.php @@ -0,0 +1,8 @@ +