-
Notifications
You must be signed in to change notification settings - Fork 423
Add modular threads #1795
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add modular threads #1795
Changes from all commits
73eb0be
46dd89c
fc2e183
f524630
2b81fb4
3dbd9ca
05c9433
d36bc29
1a1f345
93f81d8
fe6ce12
724f068
cf71f1d
2bfac92
eab9194
712376f
222d8c9
60591b8
ad15340
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,101 @@ | ||||||
| package frankenphp | ||||||
|
|
||||||
| import ( | ||||||
| "context" | ||||||
| "log/slog" | ||||||
| "net/http" | ||||||
| "sync" | ||||||
| ) | ||||||
|
|
||||||
| // EXPERIMENTAL: WorkerExtension allows you to register an external worker where instead of calling frankenphp handlers on | ||||||
| // frankenphp_handle_request(), the ProvideRequest method is called. You are responsible for providing a standard | ||||||
| // http.Request that will be conferred to the underlying worker script. | ||||||
| // | ||||||
| // A worker script with the provided Name and FileName will be registered, along with the provided | ||||||
| // configuration. You can also provide any environment variables that you want through Env. GetMinThreads allows you to | ||||||
| // reserve a minimum number of threads from the frankenphp thread pool. This number must be positive. | ||||||
| // These methods are only called once at startup, so register them in an init() function. | ||||||
| // | ||||||
| // When a thread is activated and nearly ready, ThreadActivatedNotification will be called with an opaque threadId; | ||||||
| // this is a time for setting up any per-thread resources. When a thread is about to be returned to the thread pool, | ||||||
| // you will receive a call to ThreadDrainNotification that will inform you of the threadId. | ||||||
| // After the thread is returned to the thread pool, ThreadDeactivatedNotification will be called. | ||||||
| // | ||||||
| // Once you have at least one thread activated, you will receive calls to ProvideRequest where you should respond with | ||||||
| // a request. FrankenPHP will automatically pipe these requests to the worker script and handle the response. | ||||||
| // The piping process is designed to run indefinitely and will be gracefully shut down when FrankenPHP shuts down. | ||||||
| // | ||||||
| // Note: External workers receive the lowest priority when determining thread allocations. If GetMinThreads cannot be | ||||||
| // allocated, then frankenphp will panic and provide this information to the user (who will need to allocate more | ||||||
| // total threads). Don't be greedy. | ||||||
| type WorkerExtension interface { | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
? |
||||||
| Name() string | ||||||
| FileName() string | ||||||
| Env() PreparedEnv | ||||||
| GetMinThreads() int | ||||||
| ThreadActivatedNotification(threadId int) | ||||||
| ThreadDrainNotification(threadId int) | ||||||
| ThreadDeactivatedNotification(threadId int) | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you have a specific use case for these notifications? Otherwise it would allow moving this code to the If tasks are sent via request, then the workers or even frankenphp don't really need to know about 'external workers', they just need to handle the requests. It would even be cleaner to expose hooks for thread start/stop
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Something like these worker options: func WithWorkerOnThreadActivation(hook func(threadId int)) WorkerOption
func WithWorkerOnThreadDrain(hook func(threadId int)) WorkerOption
func WithWorkerOnThreadDeactivation(hook func(threadId int)) WorkerOption
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
It can be quite handy to know how many worker threads you have running (especially with autoscaling). That being said, it isn’t necessary. Good call about the decoupling.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe can we provide a default implementation as an embeddable struct, but this can be done in a follow up PR. |
||||||
| ProvideRequest() *WorkerRequest[any, any] | ||||||
| } | ||||||
|
|
||||||
| // EXPERIMENTAL | ||||||
| type WorkerRequest[P any, R any] struct { | ||||||
| // The request for your worker script to handle | ||||||
| Request *http.Request | ||||||
| // Response is a response writer that provides the output of the provided request, it must not be nil to access the request body | ||||||
| Response http.ResponseWriter | ||||||
|
Comment on lines
+44
to
+47
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should make sure that these fields are optional (they are useless for many use cases, and we could do some optimisations when they are nil). Could be done later too. |
||||||
| // CallbackParameters is an optional field that will be converted in PHP types and passed as parameter to the PHP callback | ||||||
| CallbackParameters P | ||||||
| // AfterFunc is an optional function that will be called after the request is processed with the original value, the return of the PHP callback, converted in Go types, is passed as parameter | ||||||
| AfterFunc func(callbackReturn R) | ||||||
| } | ||||||
|
|
||||||
| var externalWorkers = make(map[string]WorkerExtension) | ||||||
| var externalWorkerMutex sync.Mutex | ||||||
|
|
||||||
| // EXPERIMENTAL | ||||||
| func RegisterExternalWorker(worker WorkerExtension) { | ||||||
| externalWorkerMutex.Lock() | ||||||
| defer externalWorkerMutex.Unlock() | ||||||
|
|
||||||
| externalWorkers[worker.Name()] = worker | ||||||
| } | ||||||
|
|
||||||
| // startExternalWorkerPipe creates a pipe from an external worker to the main worker. | ||||||
| func startExternalWorkerPipe(w *worker, externalWorker WorkerExtension, thread *phpThread) { | ||||||
| for { | ||||||
| rq := externalWorker.ProvideRequest() | ||||||
|
|
||||||
| if rq == nil || rq.Request == nil { | ||||||
| logger.LogAttrs(context.Background(), slog.LevelWarn, "external worker provided nil request", slog.String("worker", w.name), slog.Int("thread", thread.threadIndex)) | ||||||
| continue | ||||||
| } | ||||||
|
|
||||||
| r := rq.Request | ||||||
| fr, err := NewRequestWithContext(r, WithOriginalRequest(r), WithWorkerName(w.name)) | ||||||
| if err != nil { | ||||||
| logger.LogAttrs(context.Background(), slog.LevelError, "error creating request for external worker", slog.String("worker", w.name), slog.Int("thread", thread.threadIndex), slog.Any("error", err)) | ||||||
| continue | ||||||
| } | ||||||
|
|
||||||
| if fc, ok := fromContext(fr.Context()); ok { | ||||||
| fc.responseWriter = rq.Response | ||||||
| fc.handlerParameters = rq.CallbackParameters | ||||||
|
|
||||||
| // Queue the request and wait for completion if Done channel was provided | ||||||
| logger.LogAttrs(context.Background(), slog.LevelInfo, "queue the external worker request", slog.String("worker", w.name), slog.Int("thread", thread.threadIndex)) | ||||||
|
|
||||||
| w.requestChan <- fc | ||||||
| if rq.AfterFunc != nil { | ||||||
| go func() { | ||||||
| <-fc.done | ||||||
|
|
||||||
| if rq.AfterFunc != nil { | ||||||
| rq.AfterFunc(fc.handlerReturn) | ||||||
| } | ||||||
| }() | ||||||
| } | ||||||
| } | ||||||
| } | ||||||
| } | ||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,136 @@ | ||
| package frankenphp | ||
|
|
||
| import ( | ||
| "io" | ||
| "net/http/httptest" | ||
| "sync" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/stretchr/testify/assert" | ||
| "github.com/stretchr/testify/require" | ||
| ) | ||
|
|
||
| // mockWorkerExtension implements the WorkerExtension interface | ||
| type mockWorkerExtension struct { | ||
withinboredom marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| name string | ||
| fileName string | ||
| env PreparedEnv | ||
| minThreads int | ||
| requestChan chan *WorkerRequest[any, any] | ||
| activatedCount int | ||
| drainCount int | ||
| deactivatedCount int | ||
| mu sync.Mutex | ||
| } | ||
|
|
||
| func newMockWorkerExtension(name, fileName string, minThreads int) *mockWorkerExtension { | ||
| return &mockWorkerExtension{ | ||
| name: name, | ||
| fileName: fileName, | ||
| env: make(PreparedEnv), | ||
| minThreads: minThreads, | ||
| requestChan: make(chan *WorkerRequest[any, any], 10), // Buffer to avoid blocking | ||
| } | ||
| } | ||
|
|
||
| func (m *mockWorkerExtension) Name() string { | ||
| return m.name | ||
| } | ||
|
|
||
| func (m *mockWorkerExtension) FileName() string { | ||
| return m.fileName | ||
| } | ||
|
|
||
| func (m *mockWorkerExtension) Env() PreparedEnv { | ||
| return m.env | ||
| } | ||
|
|
||
| func (m *mockWorkerExtension) GetMinThreads() int { | ||
| return m.minThreads | ||
| } | ||
|
|
||
| func (m *mockWorkerExtension) ThreadActivatedNotification(threadId int) { | ||
| m.mu.Lock() | ||
| defer m.mu.Unlock() | ||
| m.activatedCount++ | ||
| } | ||
|
|
||
| func (m *mockWorkerExtension) ThreadDrainNotification(threadId int) { | ||
| m.mu.Lock() | ||
| defer m.mu.Unlock() | ||
| m.drainCount++ | ||
| } | ||
|
|
||
| func (m *mockWorkerExtension) ThreadDeactivatedNotification(threadId int) { | ||
| m.mu.Lock() | ||
| defer m.mu.Unlock() | ||
| m.deactivatedCount++ | ||
| } | ||
|
|
||
| func (m *mockWorkerExtension) ProvideRequest() *WorkerRequest[any, any] { | ||
| return <-m.requestChan | ||
| } | ||
|
|
||
| func (m *mockWorkerExtension) InjectRequest(r *WorkerRequest[any, any]) { | ||
| m.requestChan <- r | ||
| } | ||
|
|
||
| func (m *mockWorkerExtension) GetActivatedCount() int { | ||
| m.mu.Lock() | ||
| defer m.mu.Unlock() | ||
| return m.activatedCount | ||
| } | ||
|
|
||
| func TestWorkerExtension(t *testing.T) { | ||
| // Create a mock extension | ||
| mockExt := newMockWorkerExtension("mockWorker", "testdata/worker.php", 1) | ||
|
|
||
| // Register the mock extension | ||
| RegisterExternalWorker(mockExt) | ||
|
|
||
| // Clean up external workers after test to avoid interfering with other tests | ||
| defer func() { | ||
| delete(externalWorkers, mockExt.Name()) | ||
| }() | ||
|
|
||
| // Initialize FrankenPHP with a worker that has a different name than our extension | ||
| err := Init() | ||
| require.NoError(t, err) | ||
| defer Shutdown() | ||
|
|
||
| // Wait a bit for the worker to be ready | ||
| time.Sleep(100 * time.Millisecond) | ||
|
|
||
| // Verify that the extension's thread was activated | ||
| assert.GreaterOrEqual(t, mockExt.GetActivatedCount(), 1, "Thread should have been activated") | ||
|
|
||
| // Create a test request | ||
| req := httptest.NewRequest("GET", "http://example.com/test/?foo=bar", nil) | ||
| req.Header.Set("X-Test-Header", "test-value") | ||
|
|
||
| w := httptest.NewRecorder() | ||
|
|
||
| // Create a channel to signal when the request is done | ||
| done := make(chan struct{}) | ||
|
|
||
| // Inject the request into the worker through the extension | ||
| mockExt.InjectRequest(&WorkerRequest[any, any]{ | ||
| Request: req, | ||
| Response: w, | ||
| AfterFunc: func(callbackReturn any) { | ||
| close(done) | ||
| }, | ||
| }) | ||
|
|
||
| // Wait for the request to be fully processed | ||
| <-done | ||
|
|
||
| // Check the response - now safe from race conditions | ||
| resp := w.Result() | ||
| body, _ := io.ReadAll(resp.Body) | ||
|
|
||
| // The worker.php script should output information about the request | ||
| // We're just checking that we got a response, not the specific content | ||
| assert.NotEmpty(t, body, "Response body should not be empty") | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
customworker.go?