From fc776513a0485d3d3bbdf3756a39af09d6e9b781 Mon Sep 17 00:00:00 2001 From: Alliballibaba Date: Fri, 12 Sep 2025 00:00:51 +0200 Subject: [PATCH 01/19] Adds latency tracking wip. --- cgi.go | 9 +-- context.go | 8 ++ debugstate.go | 4 +- frankenphp.go | 2 + latencytracking.go | 157 ++++++++++++++++++++++++++++++++++++++++ latencytracking_test.go | 70 ++++++++++++++++++ phpthread.go | 15 ++-- scaling.go | 12 +++ worker.go | 26 ++++++- 9 files changed, 284 insertions(+), 19 deletions(-) create mode 100644 latencytracking.go create mode 100644 latencytracking_test.go diff --git a/cgi.go b/cgi.go index f9aae8690..99bf8f4bc 100644 --- a/cgi.go +++ b/cgi.go @@ -131,13 +131,6 @@ func addKnownVariablesToServer(thread *phpThread, fc *frankenPHPContext, trackVa serverPort := reqPort contentLength := request.Header.Get("Content-Length") - var requestURI string - if fc.originalRequest != nil { - requestURI = fc.originalRequest.URL.RequestURI() - } else { - requestURI = request.URL.RequestURI() - } - C.frankenphp_register_bulk( trackVarsArray, packCgiVariable(keys["REMOTE_ADDR"], ip), @@ -167,7 +160,7 @@ func addKnownVariablesToServer(thread *phpThread, fc *frankenPHPContext, trackVa packCgiVariable(keys["AUTH_TYPE"], ""), packCgiVariable(keys["REMOTE_IDENT"], ""), // Request uri of the original request - packCgiVariable(keys["REQUEST_URI"], requestURI), + packCgiVariable(keys["REQUEST_URI"], fc.getOriginalRequest().URL.RequestURI()), packCgiVariable(keys["SSL_CIPHER"], sslCipher), ) diff --git a/context.go b/context.go index 65aee5b75..a67bf50dd 100644 --- a/context.go +++ b/context.go @@ -138,6 +138,14 @@ func (fc *frankenPHPContext) clientHasClosed() bool { } } +func (fc *frankenPHPContext) getOriginalRequest() *http.Request { + if fc.originalRequest != nil { + return fc.originalRequest + } + + return fc.request +} + // reject sends a response with the given status code and message func (fc *frankenPHPContext) reject(statusCode int, message string) { if fc.isDone { diff --git a/debugstate.go b/debugstate.go index a7941ac79..f9c83ca57 100644 --- a/debugstate.go +++ b/debugstate.go @@ -6,8 +6,8 @@ type ThreadDebugState struct { Name string State string IsWaiting bool - IsBusy bool WaitingSinceMilliseconds int64 + IsLowLatency bool } // EXPERIMENTAL: FrankenPHPDebugState prints the state of all PHP threads - debugging purposes only @@ -40,7 +40,7 @@ func threadDebugState(thread *phpThread) ThreadDebugState { Name: thread.name(), State: thread.state.name(), IsWaiting: thread.state.isInWaitingState(), - IsBusy: !thread.state.isInWaitingState(), WaitingSinceMilliseconds: thread.state.waitTime(), + IsLowLatency: thread.isLowLatencyThread, } } diff --git a/frankenphp.go b/frankenphp.go index 58cd95b89..ef3789bf0 100644 --- a/frankenphp.go +++ b/frankenphp.go @@ -278,6 +278,8 @@ func Init(options ...Option) error { return err } + initLatencyTracking() + initAutoScaling(mainThread) ctx := context.Background() diff --git a/latencytracking.go b/latencytracking.go new file mode 100644 index 000000000..91283bcf2 --- /dev/null +++ b/latencytracking.go @@ -0,0 +1,157 @@ +package frankenphp + +import ( + "math/rand" + "regexp" + "strings" + "sync" + "sync/atomic" + "time" +) + +// limit of tracked path children +const childLimitPerNode = 10 + +// path parts longer than this are considered a wildcard +const charLimitWildcard = 50 + +var ( + // requests taking longer than this are considered slow (var for tests) + slowRequestThreshold = 1000 * time.Millisecond + // % of autoscaled threads that are not marked as low latency (var for tests) + slowThreadPercentile = 80 + + rootNode = &pathNode{children: make(map[string]*pathNode), mu: sync.RWMutex{}} + enableLatencyTracking = atomic.Bool{} + numRe = regexp.MustCompile(`^\d+$`) + uuidRe = regexp.MustCompile(`^[a-f0-9-]{36}$`) +) + +type pathNode struct { + children map[string]*pathNode + latency time.Duration + mu sync.RWMutex +} + +func (n *pathNode) commitLatency(time time.Duration) { + n.mu.Lock() + n.latency = n.latency/2 + time/2 + n.mu.Unlock() +} + +func (n *pathNode) getLatency() time.Duration { + n.mu.RLock() + defer n.mu.RUnlock() + return n.latency +} + +func (n *pathNode) addChild(child *pathNode, path string) { + n.mu.Lock() + if len(n.children) <= childLimitPerNode { + n.children[path] = child + } + n.mu.Unlock() +} + +func (n *pathNode) getChild(path string) (*pathNode, bool) { + n.mu.RLock() + defer n.mu.RUnlock() + node, ok := n.children[path] + return node, ok +} + +func initLatencyTracking() { + enableLatencyTracking.Store(false) +} + +// get a random thread that is not marked as low latency +func getRandomSlowThread(worker *worker) *phpThread { + worker.threadMutex.RLock() + defer worker.threadMutex.RUnlock() + + slowThreads := []*phpThread{} + for _, thread := range worker.threads { + if !thread.isLowLatencyThread { + slowThreads = append(slowThreads, thread) + } + } + + // if there are no slow threads, return a random thread + if len(slowThreads) == 0 { + return worker.threads[rand.Intn(len(worker.threads))] + } + + return slowThreads[rand.Intn(len(slowThreads))] +} + +func recordSlowRequest(fc *frankenPHPContext, duration time.Duration) { + if duration > slowRequestThreshold { + recordRequestLatency(fc, duration) + } +} + +// record a slow request in the path tree +func recordRequestLatency(fc *frankenPHPContext, duration time.Duration) { + request := fc.getOriginalRequest() + parts := strings.Split(request.URL.Path, "/") + node := rootNode + + logger.Debug("slow request detected", "path", request.URL.Path, "duration", duration) + + for _, part := range parts { + if part == "" { + continue + } + if isWildcardPathPart(part) { + part = "*" + } + + childNode, exists := node.getChild(part) + if !exists { + childNode = &pathNode{ + children: make(map[string]*pathNode), + mu: sync.RWMutex{}, + } + node.addChild(childNode, part) + } + node = childNode + node.commitLatency(duration) + } +} + +// determine if a request is likely to be high latency based on the request path +func isHighLatencyRequest(fc *frankenPHPContext) bool { + request := fc.getOriginalRequest() + parts := strings.Split(request.URL.Path, "/") + node := rootNode + + for _, part := range parts { + if part == "" { + continue + } + + childNode, exists := node.getChild(part) + if exists { + node = childNode + continue + } + childNode, exists = node.getChild("*") + if exists { + node = childNode + continue + } + + return false + } + + return node.getLatency() > slowRequestThreshold +} + +// determine if a path part is a wildcard +// a path part is a wildcard if: +// - it is longer than charLimitWildcard +// - it is a number +// - it is a uuid +func isWildcardPathPart(part string) bool { + return len(part) > charLimitWildcard || numRe.MatchString(part) || uuidRe.MatchString(part) +} diff --git a/latencytracking_test.go b/latencytracking_test.go new file mode 100644 index 000000000..5ac272f66 --- /dev/null +++ b/latencytracking_test.go @@ -0,0 +1,70 @@ +package frankenphp + +import ( + "log/slog" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.uber.org/zap/exp/zapslog" + "go.uber.org/zap/zaptest" +) + +func TestIsWildcardPathPart(t *testing.T) { + tests := []struct { + input string + expected bool + }{ + {"123", true}, + {"*123", false}, + {"users", false}, + {"550e8400-e29b-41d4-a716-446655440000", true}, // uuid + {"550e8400-e29b-41d4-a716-44665544000Z", false}, + // very long string + {"asdfghjklkjhgfdsasdfghjkjhgfdsasdfghjkjhgfdsasdfghjkjhgfdsasdfghjkjhgfdsasdfghjkjhgf", true}, + } + + for _, test := range tests { + isWildcard := isWildcardPathPart(test.input) + assert.Equal(t, test.expected, isWildcard, "isWildcard(%q) = %q; want %q", test.input, isWildcard, test.expected) + } +} + +func assertGetRequest(t *testing.T, url string, expectedBodyContains string) { + r := httptest.NewRequest("GET", url, nil) + w := httptest.NewRecorder() + req, err := NewRequestWithContext(r, WithWorkerName("worker")) + assert.NoError(t, err) + assert.NoError(t, ServeHTTP(w, req)) + assert.Contains(t, w.Body.String(), expectedBodyContains) +} + +func TestTunnelLowLatencyRequest(t *testing.T) { + assert.NoError(t, Init( + WithWorkers("worker", "testdata/sleep.php", 1), + WithNumThreads(2), + WithMaxThreads(3), + WithLogger(slog.New(zapslog.NewHandler(zaptest.NewLogger(t).Core()))), + )) + defer Shutdown() + + // record request path as slow, manipulate thresholds to make it easy to trigger + slowRequestThreshold = 1 * time.Millisecond + slowThreadPercentile = 0 + scaleWorkerThread(getWorkerByName("worker")) // the scaled thread should be low-latency only + assertGetRequest(t, "/slow/123/path?sleep=5", "slept for 5 ms") + assertGetRequest(t, "/slow/123/path?sleep=5", "slept for 5 ms") + + // send 2 blocking requests that occupy all threads + go assertGetRequest(t, "/slow/123/path?sleep=500", "slept for 500 ms") + go assertGetRequest(t, "/slow/123/path?sleep=500", "slept for 500 ms") + time.Sleep(time.Millisecond * 100) // enough time to receive the requests + + // send a low latency request, it should not be blocked by the slow requests + start := time.Now() + assertGetRequest(t, "/fast/123/path?sleep=0", "slept for 0 ms") + duration := time.Since(start) + + assert.Less(t, duration.Milliseconds(), int64(100), "the low latency request should not be blocked by the slow requests") +} diff --git a/phpthread.go b/phpthread.go index a60aa8f0a..0c7140b05 100644 --- a/phpthread.go +++ b/phpthread.go @@ -15,13 +15,14 @@ import ( // identified by the index in the phpThreads slice type phpThread struct { runtime.Pinner - threadIndex int - requestChan chan *frankenPHPContext - drainChan chan struct{} - handlerMu sync.Mutex - handler threadHandler - state *threadState - sandboxedEnv map[string]*C.zend_string + threadIndex int + requestChan chan *frankenPHPContext + drainChan chan struct{} + handlerMu sync.Mutex + handler threadHandler + state *threadState + sandboxedEnv map[string]*C.zend_string + isLowLatencyThread bool } // interface that defines how the callbacks from the C thread should be handled diff --git a/scaling.go b/scaling.go index 57e6c598b..07a21c5b6 100644 --- a/scaling.go +++ b/scaling.go @@ -73,6 +73,10 @@ func addWorkerThread(worker *worker) (*phpThread, error) { if thread == nil { return nil, ErrMaxThreadsReached } + if isNearThreadLimit() { + enableLatencyTracking.Store(true) + thread.isLowLatencyThread = true + } convertToWorkerThread(thread, worker) thread.state.waitFor(stateReady, stateShuttingDown, stateReserved) return thread, nil @@ -220,4 +224,12 @@ func deactivateThreads() { // continue // } } + + if enableLatencyTracking.Load() && !isNearThreadLimit() { + enableLatencyTracking.Store(false) + } +} + +func isNearThreadLimit() bool { + return len(autoScaledThreads) >= cap(autoScaledThreads)*slowThreadPercentile/100 } diff --git a/worker.go b/worker.go index 04772fa4a..e0a79008f 100644 --- a/worker.go +++ b/worker.go @@ -217,6 +217,20 @@ func (worker *worker) countThreads() int { func (worker *worker) handleRequest(fc *frankenPHPContext) { metrics.StartWorkerRequest(worker.name) + // if the server is experiencing high latency, dispatch requests that are + // expected to be slow directly to a number of restricted threads + if enableLatencyTracking.Load() && isHighLatencyRequest(fc) { + getRandomSlowThread(worker).requestChan <- fc + stallDuration := time.Since(fc.startedAt) + <-fc.done + + requestTime := time.Since(fc.startedAt) + metrics.StopWorkerRequest(worker.name, requestTime) + recordRequestLatency(fc, requestTime-stallDuration) + + return + } + // dispatch requests to all worker threads in order worker.threadMutex.RLock() for _, thread := range worker.threads { @@ -224,7 +238,11 @@ func (worker *worker) handleRequest(fc *frankenPHPContext) { case thread.requestChan <- fc: worker.threadMutex.RUnlock() <-fc.done - metrics.StopWorkerRequest(worker.name, time.Since(fc.startedAt)) + + requestTime := time.Since(fc.startedAt) + metrics.StopWorkerRequest(worker.name, requestTime) + recordSlowRequest(fc, requestTime) + return default: // thread is busy, continue @@ -238,8 +256,12 @@ func (worker *worker) handleRequest(fc *frankenPHPContext) { select { case worker.requestChan <- fc: metrics.DequeuedWorkerRequest(worker.name) + stallDuration := time.Since(fc.startedAt) <-fc.done - metrics.StopWorkerRequest(worker.name, time.Since(fc.startedAt)) + requestTime := time.Since(fc.startedAt) + metrics.StopWorkerRequest(worker.name, requestTime) + recordSlowRequest(fc, requestTime-stallDuration) + return case scaleChan <- fc: // the request has triggered scaling, continue to wait for a thread From e8b1cf8532444361a8f8b699a7098ca1d84269b1 Mon Sep 17 00:00:00 2001 From: Alliballibaba Date: Fri, 12 Sep 2025 22:44:18 +0200 Subject: [PATCH 02/19] Adds regular threads. --- threadregular.go | 48 ++++++++++++++++++++++++++++++++++++------------ 1 file changed, 36 insertions(+), 12 deletions(-) diff --git a/threadregular.go b/threadregular.go index 88cef7e79..a9003e074 100644 --- a/threadregular.go +++ b/threadregular.go @@ -2,6 +2,7 @@ package frankenphp import ( "sync" + "time" ) // representation of a non-worker PHP thread @@ -15,7 +16,7 @@ type regularThread struct { var ( regularThreads []*phpThread - regularThreadMu = &sync.RWMutex{} + regularThreadMu = sync.RWMutex{} regularRequestChan chan *frankenPHPContext ) @@ -59,16 +60,17 @@ func (handler *regularThread) name() string { } func (handler *regularThread) waitForRequest() string { - // clear any previously sandboxed env - clearSandboxedEnv(handler.thread) + thread := handler.thread + clearSandboxedEnv(thread) // clear any previously sandboxed env handler.state.markAsWaiting(true) var fc *frankenPHPContext select { - case <-handler.thread.drainChan: + case <-thread.drainChan: // go back to beforeScriptExecution return handler.beforeScriptExecution() + case fc = <-thread.requestChan: case fc = <-regularRequestChan: } @@ -86,24 +88,46 @@ func (handler *regularThread) afterRequest() { func handleRequestWithRegularPHPThreads(fc *frankenPHPContext) { metrics.StartRequest() - select { - case regularRequestChan <- fc: - // a thread was available to handle the request immediately - <-fc.done - metrics.StopRequest() + + if latencyTrackingEnabled.Load() && isHighLatencyRequest(fc) { + regularThreadMu.RLock() + slowThread := getRandomSlowThread(regularThreads) + regularThreadMu.RUnlock() + stallRegularPHPRequests(fc, slowThread.requestChan) + return - default: - // no thread was available } + regularThreadMu.RLock() + for _, thread := range regularThreads { + select { + case thread.requestChan <- fc: + regularThreadMu.RUnlock() + <-fc.done + metrics.StopRequest() + recordSlowRequest(fc, time.Since(fc.startedAt)) + + return + default: + // thread is busy, continue + } + } + regularThreadMu.RUnlock() + // if no thread was available, mark the request as queued and fan it out to all threads + stallRegularPHPRequests(fc, regularRequestChan) +} + +func stallRegularPHPRequests(fc *frankenPHPContext, requestChan chan *frankenPHPContext) { metrics.QueuedRequest() for { select { - case regularRequestChan <- fc: + case requestChan <- fc: metrics.DequeuedRequest() + stallTime := time.Since(fc.startedAt) <-fc.done metrics.StopRequest() + recordSlowRequest(fc, time.Since(fc.startedAt)-stallTime) return case scaleChan <- fc: // the request has triggered scaling, continue to wait for a thread From cb1652d63548c15f85ef9819c2fad73357c8fde9 Mon Sep 17 00:00:00 2001 From: Alliballibaba Date: Fri, 12 Sep 2025 22:44:32 +0200 Subject: [PATCH 03/19] Cleans up logic. --- latencytracking.go | 186 +++++++++++++++++++++------------------- latencytracking_test.go | 86 +++++++++++++++---- scaling.go | 14 +-- worker.go | 20 +++-- 4 files changed, 177 insertions(+), 129 deletions(-) diff --git a/latencytracking.go b/latencytracking.go index 91283bcf2..0a340a31c 100644 --- a/latencytracking.go +++ b/latencytracking.go @@ -10,7 +10,7 @@ import ( ) // limit of tracked path children -const childLimitPerNode = 10 +const maxTrackedPaths = 1000 // path parts longer than this are considered a wildcard const charLimitWildcard = 50 @@ -21,67 +21,62 @@ var ( // % of autoscaled threads that are not marked as low latency (var for tests) slowThreadPercentile = 80 - rootNode = &pathNode{children: make(map[string]*pathNode), mu: sync.RWMutex{}} - enableLatencyTracking = atomic.Bool{} - numRe = regexp.MustCompile(`^\d+$`) - uuidRe = regexp.MustCompile(`^[a-f0-9-]{36}$`) + latencyTrackingEnabled = atomic.Bool{} + slowRequestsMu = sync.RWMutex{} + slowRequestPaths map[string]time.Duration + numRe = regexp.MustCompile(`^\d+$`) + uuidRe = regexp.MustCompile(`^[a-f0-9-]{36}$`) ) -type pathNode struct { - children map[string]*pathNode - latency time.Duration - mu sync.RWMutex -} - -func (n *pathNode) commitLatency(time time.Duration) { - n.mu.Lock() - n.latency = n.latency/2 + time/2 - n.mu.Unlock() -} - -func (n *pathNode) getLatency() time.Duration { - n.mu.RLock() - defer n.mu.RUnlock() - return n.latency +func initLatencyTracking() { + latencyTrackingEnabled.Store(false) + slowRequestPaths = make(map[string]time.Duration) } -func (n *pathNode) addChild(child *pathNode, path string) { - n.mu.Lock() - if len(n.children) <= childLimitPerNode { - n.children[path] = child +// trigger latency tracking while scaling threads +func triggerLatencyTrackingIfNeeded(thread *phpThread) { + if isNearThreadLimit() { + latencyTrackingEnabled.Store(true) + thread.isLowLatencyThread = true + logger.Debug("low latency thread spawned") } - n.mu.Unlock() } -func (n *pathNode) getChild(path string) (*pathNode, bool) { - n.mu.RLock() - defer n.mu.RUnlock() - node, ok := n.children[path] - return node, ok +func stopLatencyTrackingIfNeeded() { + if latencyTrackingEnabled.Load() && !isNearThreadLimit() { + latencyTrackingEnabled.Store(false) + logger.Debug("latency tracking disabled") + } } -func initLatencyTracking() { - enableLatencyTracking.Store(false) +func isNearThreadLimit() bool { + return len(autoScaledThreads) >= cap(autoScaledThreads)*slowThreadPercentile/100 } // get a random thread that is not marked as low latency -func getRandomSlowThread(worker *worker) *phpThread { - worker.threadMutex.RLock() - defer worker.threadMutex.RUnlock() - - slowThreads := []*phpThread{} - for _, thread := range worker.threads { +func getRandomSlowThread(threads []*phpThread) *phpThread { + slowThreadCount := 0 + for _, thread := range threads { if !thread.isLowLatencyThread { - slowThreads = append(slowThreads, thread) + slowThreadCount++ } } - // if there are no slow threads, return a random thread - if len(slowThreads) == 0 { - return worker.threads[rand.Intn(len(worker.threads))] + if slowThreadCount == 0 { + panic("there must always be at least one slow thread") + } + + slowThreadNum := rand.Intn(slowThreadCount) + for _, thread := range threads { + if !thread.isLowLatencyThread { + if slowThreadNum == 0 { + return thread + } + slowThreadNum-- + } } - return slowThreads[rand.Intn(len(slowThreads))] + panic("there must always be at least one slow thread") } func recordSlowRequest(fc *frankenPHPContext, duration time.Duration) { @@ -90,68 +85,79 @@ func recordSlowRequest(fc *frankenPHPContext, duration time.Duration) { } } -// record a slow request in the path tree +// record a slow request path func recordRequestLatency(fc *frankenPHPContext, duration time.Duration) { request := fc.getOriginalRequest() - parts := strings.Split(request.URL.Path, "/") - node := rootNode + normalizedPath := normalizePath(request.URL.Path) + logger.Debug("slow request detected", "path", normalizedPath, "duration", duration) - logger.Debug("slow request detected", "path", request.URL.Path, "duration", duration) + slowRequestsMu.Lock() - for _, part := range parts { - if part == "" { - continue - } - if isWildcardPathPart(part) { - part = "*" - } - - childNode, exists := node.getChild(part) - if !exists { - childNode = &pathNode{ - children: make(map[string]*pathNode), - mu: sync.RWMutex{}, - } - node.addChild(childNode, part) - } - node = childNode - node.commitLatency(duration) + // if too many slow paths are tracked, clear the map + if len(slowRequestPaths) > maxTrackedPaths { + slowRequestPaths = make(map[string]time.Duration) } + recordedLatency, _ := slowRequestPaths[normalizedPath] + // average the recorded latency with the new latency + slowRequestPaths[normalizedPath] = duration/2 + recordedLatency/2 + + slowRequestsMu.Unlock() } // determine if a request is likely to be high latency based on the request path func isHighLatencyRequest(fc *frankenPHPContext) bool { request := fc.getOriginalRequest() - parts := strings.Split(request.URL.Path, "/") - node := rootNode + normalizedPath := normalizePath(request.URL.Path) - for _, part := range parts { - if part == "" { - continue - } + slowRequestsMu.RLock() + latency, exists := slowRequestPaths[normalizedPath] + slowRequestsMu.RUnlock() - childNode, exists := node.getChild(part) - if exists { - node = childNode - continue - } - childNode, exists = node.getChild("*") - if exists { - node = childNode - continue - } + if exists { + return latency > slowRequestThreshold + } - return false + return false +} + +// TODO: query? +func normalizePath(path string) string { + pathLen := len(path) + if pathLen > 1 && path[pathLen-1] == '/' { + pathLen-- // ignore trailing slash for processing } - return node.getLatency() > slowRequestThreshold + var b strings.Builder + b.Grow(len(path)) // pre-allocate at least original size + start := 0 + for i := 0; i <= pathLen; i++ { + if i == pathLen || path[i] == '/' { + if i > start { + seg := path[start:i] + b.WriteString(normalizePathPart(seg)) + } + if i < pathLen { + b.WriteByte('/') + } + start = i + 1 + } + } + return b.String() } // determine if a path part is a wildcard -// a path part is a wildcard if: -// - it is longer than charLimitWildcard -// - it is a number -// - it is a uuid -func isWildcardPathPart(part string) bool { - return len(part) > charLimitWildcard || numRe.MatchString(part) || uuidRe.MatchString(part) +func normalizePathPart(part string) string { + if len(part) > charLimitWildcard { + return ":slug" + } + + if numRe.MatchString(part) { + return ":id" + } + + if uuidRe.MatchString(part) { + return ":uuid" + } + + return part } diff --git a/latencytracking_test.go b/latencytracking_test.go index 5ac272f66..23bf480f1 100644 --- a/latencytracking_test.go +++ b/latencytracking_test.go @@ -3,6 +3,7 @@ package frankenphp import ( "log/slog" "net/http/httptest" + "sync" "testing" "time" @@ -11,36 +12,36 @@ import ( "go.uber.org/zap/zaptest" ) -func TestIsWildcardPathPart(t *testing.T) { +func TestNormalizePath(t *testing.T) { tests := []struct { input string - expected bool + expected string }{ - {"123", true}, - {"*123", false}, - {"users", false}, - {"550e8400-e29b-41d4-a716-446655440000", true}, // uuid - {"550e8400-e29b-41d4-a716-44665544000Z", false}, - // very long string - {"asdfghjklkjhgfdsasdfghjkjhgfdsasdfghjkjhgfdsasdfghjkjhgfdsasdfghjkjhgfdsasdfghjkjhgf", true}, + {"123", ":id"}, + {"/*123/456/asd", "/*123/:id/asd"}, + {"/users/", "/users"}, + {"/product/550e8400-e29b-41d4-a716-446655440000/", "/product/:uuid"}, + {"/not/a/uuid/550e8400-e29b-41d4-a716-44665544000Z/", "/not/a/uuid/550e8400-e29b-41d4-a716-44665544000Z"}, + {"/page/asdfghjk-lkjhgfdsasdfghjkjhgf-dsasdfghjkjhgfdsasdf-ghjkjhgfdsasdfghjkjhgfdsasdfghjkjhgf", "/page/:slug"}, } for _, test := range tests { - isWildcard := isWildcardPathPart(test.input) - assert.Equal(t, test.expected, isWildcard, "isWildcard(%q) = %q; want %q", test.input, isWildcard, test.expected) + normalizedPath := normalizePath(test.input) + assert.Equal(t, test.expected, normalizedPath, "normalizePath(%q) = %q; want %q", test.input, normalizedPath, test.expected) } } -func assertGetRequest(t *testing.T, url string, expectedBodyContains string) { +func assertGetRequest(t *testing.T, url string, expectedBodyContains string, opts ...RequestOption) { + t.Helper() r := httptest.NewRequest("GET", url, nil) w := httptest.NewRecorder() - req, err := NewRequestWithContext(r, WithWorkerName("worker")) + req, err := NewRequestWithContext(r, opts...) assert.NoError(t, err) assert.NoError(t, ServeHTTP(w, req)) assert.Contains(t, w.Body.String(), expectedBodyContains) } -func TestTunnelLowLatencyRequest(t *testing.T) { +func TestTunnelLowLatencyRequest_worker(t *testing.T) { assert.NoError(t, Init( WithWorkers("worker", "testdata/sleep.php", 1), WithNumThreads(2), @@ -48,23 +49,70 @@ func TestTunnelLowLatencyRequest(t *testing.T) { WithLogger(slog.New(zapslog.NewHandler(zaptest.NewLogger(t).Core()))), )) defer Shutdown() + opt := WithWorkerName("worker") + wg := sync.WaitGroup{} // record request path as slow, manipulate thresholds to make it easy to trigger slowRequestThreshold = 1 * time.Millisecond slowThreadPercentile = 0 scaleWorkerThread(getWorkerByName("worker")) // the scaled thread should be low-latency only - assertGetRequest(t, "/slow/123/path?sleep=5", "slept for 5 ms") - assertGetRequest(t, "/slow/123/path?sleep=5", "slept for 5 ms") + assertGetRequest(t, "/slow/123/path?sleep=5", "slept for 5 ms", opt) + assertGetRequest(t, "/slow/123/path?sleep=5", "slept for 5 ms", opt) // send 2 blocking requests that occupy all threads - go assertGetRequest(t, "/slow/123/path?sleep=500", "slept for 500 ms") - go assertGetRequest(t, "/slow/123/path?sleep=500", "slept for 500 ms") + wg.Add(2) + for i := 0; i < 2; i++ { + go func() { + assertGetRequest(t, "/slow/123/path?sleep=500", "slept for 500 ms", opt) + wg.Done() + }() + } time.Sleep(time.Millisecond * 100) // enough time to receive the requests // send a low latency request, it should not be blocked by the slow requests start := time.Now() - assertGetRequest(t, "/fast/123/path?sleep=0", "slept for 0 ms") + assertGetRequest(t, "/fast/123/path?sleep=0", "slept for 0 ms", opt) duration := time.Since(start) assert.Less(t, duration.Milliseconds(), int64(100), "the low latency request should not be blocked by the slow requests") + + // wait to avoid race conditions across tests + wg.Wait() +} + +func TestTunnelLowLatencyRequest_module(t *testing.T) { + assert.NoError(t, Init( + WithNumThreads(1), + WithMaxThreads(2), + WithLogger(slog.New(zapslog.NewHandler(zaptest.NewLogger(t).Core()))), + )) + defer Shutdown() + wg := sync.WaitGroup{} + + // record request path as slow, manipulate thresholds to make it easy to trigger + slowRequestThreshold = 1 * time.Millisecond + slowThreadPercentile = 0 + scaleRegularThread() // the scaled thread should be low-latency only + assertGetRequest(t, "/testdata/sleep.php?sleep=5", "slept for 5 ms") + assertGetRequest(t, "/testdata/sleep.php?sleep=5", "slept for 5 ms") + + // send 2 blocking requests that occupy all threads + wg.Add(2) + for i := 0; i < 2; i++ { + go func() { + assertGetRequest(t, "/testdata/sleep.php?sleep=500", "slept for 500 ms") + wg.Done() + }() + } + time.Sleep(time.Millisecond * 100) // enough time to receive the requests + + // send a low latency request, it should not be blocked by the slow requests + start := time.Now() + assertGetRequest(t, "/testdata/sleep.php/fastpath/?sleep=0", "slept for 0 ms") + duration := time.Since(start) + + assert.Less(t, duration.Milliseconds(), int64(100), "the low latency request should not be blocked by the slow requests") + + // wait to avoid race conditions across tests + wg.Wait() } diff --git a/scaling.go b/scaling.go index 07a21c5b6..b96dff333 100644 --- a/scaling.go +++ b/scaling.go @@ -63,6 +63,7 @@ func addRegularThread() (*phpThread, error) { if thread == nil { return nil, ErrMaxThreadsReached } + triggerLatencyTrackingIfNeeded(thread) convertToRegularThread(thread) thread.state.waitFor(stateReady, stateShuttingDown, stateReserved) return thread, nil @@ -73,10 +74,7 @@ func addWorkerThread(worker *worker) (*phpThread, error) { if thread == nil { return nil, ErrMaxThreadsReached } - if isNearThreadLimit() { - enableLatencyTracking.Store(true) - thread.isLowLatencyThread = true - } + triggerLatencyTrackingIfNeeded(thread) convertToWorkerThread(thread, worker) thread.state.waitFor(stateReady, stateShuttingDown, stateReserved) return thread, nil @@ -225,11 +223,5 @@ func deactivateThreads() { // } } - if enableLatencyTracking.Load() && !isNearThreadLimit() { - enableLatencyTracking.Store(false) - } -} - -func isNearThreadLimit() bool { - return len(autoScaledThreads) >= cap(autoScaledThreads)*slowThreadPercentile/100 + stopLatencyTrackingIfNeeded() } diff --git a/worker.go b/worker.go index e0a79008f..44a82cbc7 100644 --- a/worker.go +++ b/worker.go @@ -219,14 +219,11 @@ func (worker *worker) handleRequest(fc *frankenPHPContext) { // if the server is experiencing high latency, dispatch requests that are // expected to be slow directly to a number of restricted threads - if enableLatencyTracking.Load() && isHighLatencyRequest(fc) { - getRandomSlowThread(worker).requestChan <- fc - stallDuration := time.Since(fc.startedAt) - <-fc.done - - requestTime := time.Since(fc.startedAt) - metrics.StopWorkerRequest(worker.name, requestTime) - recordRequestLatency(fc, requestTime-stallDuration) + if latencyTrackingEnabled.Load() && isHighLatencyRequest(fc) { + worker.threadMutex.RLock() + slowThread := getRandomSlowThread(worker.threads) + worker.threadMutex.RUnlock() + worker.stallRequest(fc, slowThread.requestChan) return } @@ -251,10 +248,15 @@ func (worker *worker) handleRequest(fc *frankenPHPContext) { worker.threadMutex.RUnlock() // if no thread was available, mark the request as queued and apply the scaling strategy + worker.stallRequest(fc, worker.requestChan) +} + +// stall the request and trigger scaling or timeouts +func (worker *worker) stallRequest(fc *frankenPHPContext, requestChan chan *frankenPHPContext) { metrics.QueuedWorkerRequest(worker.name) for { select { - case worker.requestChan <- fc: + case requestChan <- fc: metrics.DequeuedWorkerRequest(worker.name) stallDuration := time.Since(fc.startedAt) <-fc.done From cb843557c8bb2706f8ce17d8ed365c9bf7b70b46 Mon Sep 17 00:00:00 2001 From: Alliballibaba Date: Fri, 12 Sep 2025 22:54:56 +0200 Subject: [PATCH 04/19] Refactoring. --- latencytracking.go | 13 +++++-------- latencytracking_test.go | 12 ++++++------ threadregular.go | 11 ++++++----- worker.go | 10 +++++----- 4 files changed, 22 insertions(+), 24 deletions(-) diff --git a/latencytracking.go b/latencytracking.go index 0a340a31c..74780e6e9 100644 --- a/latencytracking.go +++ b/latencytracking.go @@ -79,28 +79,25 @@ func getRandomSlowThread(threads []*phpThread) *phpThread { panic("there must always be at least one slow thread") } -func recordSlowRequest(fc *frankenPHPContext, duration time.Duration) { - if duration > slowRequestThreshold { +// record a slow request path +func trackRequestLatency(fc *frankenPHPContext, duration time.Duration, forceTracking bool) { + if duration < slowRequestThreshold && !forceTracking { recordRequestLatency(fc, duration) } -} -// record a slow request path -func recordRequestLatency(fc *frankenPHPContext, duration time.Duration) { request := fc.getOriginalRequest() normalizedPath := normalizePath(request.URL.Path) logger.Debug("slow request detected", "path", normalizedPath, "duration", duration) - slowRequestsMu.Lock() // if too many slow paths are tracked, clear the map if len(slowRequestPaths) > maxTrackedPaths { slowRequestPaths = make(map[string]time.Duration) } + + // record the latency as a moving average recordedLatency, _ := slowRequestPaths[normalizedPath] - // average the recorded latency with the new latency slowRequestPaths[normalizedPath] = duration/2 + recordedLatency/2 - slowRequestsMu.Unlock() } diff --git a/latencytracking_test.go b/latencytracking_test.go index 23bf480f1..8a855a345 100644 --- a/latencytracking_test.go +++ b/latencytracking_test.go @@ -44,13 +44,12 @@ func assertGetRequest(t *testing.T, url string, expectedBodyContains string, opt func TestTunnelLowLatencyRequest_worker(t *testing.T) { assert.NoError(t, Init( WithWorkers("worker", "testdata/sleep.php", 1), - WithNumThreads(2), - WithMaxThreads(3), + WithNumThreads(2), // one 'high latency worker thread' (and one unused regular thread) + WithMaxThreads(3), // one 'low latency worker thread' WithLogger(slog.New(zapslog.NewHandler(zaptest.NewLogger(t).Core()))), )) defer Shutdown() opt := WithWorkerName("worker") - wg := sync.WaitGroup{} // record request path as slow, manipulate thresholds to make it easy to trigger slowRequestThreshold = 1 * time.Millisecond @@ -60,6 +59,7 @@ func TestTunnelLowLatencyRequest_worker(t *testing.T) { assertGetRequest(t, "/slow/123/path?sleep=5", "slept for 5 ms", opt) // send 2 blocking requests that occupy all threads + wg := sync.WaitGroup{} wg.Add(2) for i := 0; i < 2; i++ { go func() { @@ -82,12 +82,11 @@ func TestTunnelLowLatencyRequest_worker(t *testing.T) { func TestTunnelLowLatencyRequest_module(t *testing.T) { assert.NoError(t, Init( - WithNumThreads(1), - WithMaxThreads(2), + WithNumThreads(1), // one 'high latency thread' + WithMaxThreads(2), // one 'low latency thread' WithLogger(slog.New(zapslog.NewHandler(zaptest.NewLogger(t).Core()))), )) defer Shutdown() - wg := sync.WaitGroup{} // record request path as slow, manipulate thresholds to make it easy to trigger slowRequestThreshold = 1 * time.Millisecond @@ -97,6 +96,7 @@ func TestTunnelLowLatencyRequest_module(t *testing.T) { assertGetRequest(t, "/testdata/sleep.php?sleep=5", "slept for 5 ms") // send 2 blocking requests that occupy all threads + wg := sync.WaitGroup{} wg.Add(2) for i := 0; i < 2; i++ { go func() { diff --git a/threadregular.go b/threadregular.go index a9003e074..8fda6891a 100644 --- a/threadregular.go +++ b/threadregular.go @@ -93,7 +93,7 @@ func handleRequestWithRegularPHPThreads(fc *frankenPHPContext) { regularThreadMu.RLock() slowThread := getRandomSlowThread(regularThreads) regularThreadMu.RUnlock() - stallRegularPHPRequests(fc, slowThread.requestChan) + stallRegularPHPRequests(fc, slowThread.requestChan, true) return } @@ -105,7 +105,7 @@ func handleRequestWithRegularPHPThreads(fc *frankenPHPContext) { regularThreadMu.RUnlock() <-fc.done metrics.StopRequest() - recordSlowRequest(fc, time.Since(fc.startedAt)) + trackRequestLatency(fc, time.Since(fc.startedAt), false) return default: @@ -115,10 +115,11 @@ func handleRequestWithRegularPHPThreads(fc *frankenPHPContext) { regularThreadMu.RUnlock() // if no thread was available, mark the request as queued and fan it out to all threads - stallRegularPHPRequests(fc, regularRequestChan) + stallRegularPHPRequests(fc, regularRequestChan, false) } -func stallRegularPHPRequests(fc *frankenPHPContext, requestChan chan *frankenPHPContext) { +// stall the request and trigger scaling or timeouts +func stallRegularPHPRequests(fc *frankenPHPContext, requestChan chan *frankenPHPContext, forceTracking bool) { metrics.QueuedRequest() for { select { @@ -127,7 +128,7 @@ func stallRegularPHPRequests(fc *frankenPHPContext, requestChan chan *frankenPHP stallTime := time.Since(fc.startedAt) <-fc.done metrics.StopRequest() - recordSlowRequest(fc, time.Since(fc.startedAt)-stallTime) + trackRequestLatency(fc, time.Since(fc.startedAt)-stallTime, forceTracking) return case scaleChan <- fc: // the request has triggered scaling, continue to wait for a thread diff --git a/worker.go b/worker.go index 44a82cbc7..17fe0f063 100644 --- a/worker.go +++ b/worker.go @@ -223,7 +223,7 @@ func (worker *worker) handleRequest(fc *frankenPHPContext) { worker.threadMutex.RLock() slowThread := getRandomSlowThread(worker.threads) worker.threadMutex.RUnlock() - worker.stallRequest(fc, slowThread.requestChan) + worker.stallRequest(fc, slowThread.requestChan, true) return } @@ -238,7 +238,7 @@ func (worker *worker) handleRequest(fc *frankenPHPContext) { requestTime := time.Since(fc.startedAt) metrics.StopWorkerRequest(worker.name, requestTime) - recordSlowRequest(fc, requestTime) + trackRequestLatency(fc, requestTime, false) return default: @@ -248,11 +248,11 @@ func (worker *worker) handleRequest(fc *frankenPHPContext) { worker.threadMutex.RUnlock() // if no thread was available, mark the request as queued and apply the scaling strategy - worker.stallRequest(fc, worker.requestChan) + worker.stallRequest(fc, worker.requestChan, false) } // stall the request and trigger scaling or timeouts -func (worker *worker) stallRequest(fc *frankenPHPContext, requestChan chan *frankenPHPContext) { +func (worker *worker) stallRequest(fc *frankenPHPContext, requestChan chan *frankenPHPContext, forceTracking bool) { metrics.QueuedWorkerRequest(worker.name) for { select { @@ -262,7 +262,7 @@ func (worker *worker) stallRequest(fc *frankenPHPContext, requestChan chan *fran <-fc.done requestTime := time.Since(fc.startedAt) metrics.StopWorkerRequest(worker.name, requestTime) - recordSlowRequest(fc, requestTime-stallDuration) + trackRequestLatency(fc, requestTime-stallDuration, forceTracking) return case scaleChan <- fc: From f3a62ad351408dc171917791dce9af7167c32c09 Mon Sep 17 00:00:00 2001 From: Alliballibaba Date: Fri, 12 Sep 2025 22:56:58 +0200 Subject: [PATCH 05/19] Add --rm to load test. --- testdata/performance/perf-test.sh | 1 + testdata/performance/performance-testing.md | 7 ------- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/testdata/performance/perf-test.sh b/testdata/performance/perf-test.sh index f69585c54..056e8d3d2 100755 --- a/testdata/performance/perf-test.sh +++ b/testdata/performance/perf-test.sh @@ -15,6 +15,7 @@ select filename in ./testdata/performance/*.js; do docker run --cap-add=SYS_PTRACE --security-opt seccomp=unconfined \ -p 8125:80 \ -v "$PWD:/go/src/app" \ + --rm \ --name load-test-container \ -e "MAX_THREADS=$maxThreads" \ -e "WORKER_THREADS=$workerThreads" \ diff --git a/testdata/performance/performance-testing.md b/testdata/performance/performance-testing.md index 717941a0a..fa8f7537b 100644 --- a/testdata/performance/performance-testing.md +++ b/testdata/performance/performance-testing.md @@ -10,10 +10,3 @@ bash testdata/performance/perf-test.sh This will build the `frankenphp-dev` Docker image and run it under the name 'load-test-container' in the background. Additionally, it will run the `grafana/k6` container, and you'll be able to choose the load test you want to run. A `flamegraph.svg` will be created in the `testdata/performance` directory. - -If the load test has stopped prematurely, you might have to remove the container manually: - -```sh -docker stop load-test-container -docker rm load-test-container -``` From e2ad56394132a74473934f1ca7e22352ca276f8a Mon Sep 17 00:00:00 2001 From: Alliballibaba Date: Fri, 12 Sep 2025 23:00:15 +0200 Subject: [PATCH 06/19] Fixes statement. --- latencytracking.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/latencytracking.go b/latencytracking.go index 74780e6e9..2b1e14a62 100644 --- a/latencytracking.go +++ b/latencytracking.go @@ -82,7 +82,7 @@ func getRandomSlowThread(threads []*phpThread) *phpThread { // record a slow request path func trackRequestLatency(fc *frankenPHPContext, duration time.Duration, forceTracking bool) { if duration < slowRequestThreshold && !forceTracking { - recordRequestLatency(fc, duration) + return } request := fc.getOriginalRequest() From c5e2c49b78a46612f5de9a74c8d8bdf25e5e7844 Mon Sep 17 00:00:00 2001 From: Alliballibaba Date: Fri, 12 Sep 2025 23:16:52 +0200 Subject: [PATCH 07/19] linting. --- latencytracking.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/latencytracking.go b/latencytracking.go index 2b1e14a62..4580705fd 100644 --- a/latencytracking.go +++ b/latencytracking.go @@ -96,7 +96,7 @@ func trackRequestLatency(fc *frankenPHPContext, duration time.Duration, forceTra } // record the latency as a moving average - recordedLatency, _ := slowRequestPaths[normalizedPath] + recordedLatency := slowRequestPaths[normalizedPath] slowRequestPaths[normalizedPath] = duration/2 + recordedLatency/2 slowRequestsMu.Unlock() } From d05c3ccc315c767234fc16c093228763337e2a53 Mon Sep 17 00:00:00 2001 From: Alliballibaba Date: Fri, 12 Sep 2025 23:20:18 +0200 Subject: [PATCH 08/19] fixes go.sh --- testdata/performance/start-server.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testdata/performance/start-server.sh b/testdata/performance/start-server.sh index 998c148cc..c968ada68 100755 --- a/testdata/performance/start-server.sh +++ b/testdata/performance/start-server.sh @@ -2,6 +2,6 @@ # build and run FrankenPHP with the k6.Caddyfile cd /go/src/app/caddy/frankenphp && - go build --buildvcs=false && + ../../go.sh build --buildvcs=false && cd ../../testdata/performance && /go/src/app/caddy/frankenphp/frankenphp run -c k6.Caddyfile From 5054dc5570a141fc6dcb38631c88933b6222aac1 Mon Sep 17 00:00:00 2001 From: Alliballibaba Date: Fri, 12 Sep 2025 23:41:35 +0200 Subject: [PATCH 09/19] Sends hanging request on different path. --- testdata/performance/hanging-requests.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testdata/performance/hanging-requests.js b/testdata/performance/hanging-requests.js index db191fdef..0346e2b31 100644 --- a/testdata/performance/hanging-requests.js +++ b/testdata/performance/hanging-requests.js @@ -19,7 +19,7 @@ export const options = { export default function () { // 2% chance for a request that hangs for 15s if (Math.random() < 0.02) { - http.get(`${__ENV.CADDY_HOSTNAME}/sleep.php?sleep=15000&work=10000&output=100`) + http.get(`${__ENV.CADDY_HOSTNAME}/sleep.php/slow-path?sleep=15000&work=10000&output=100`) return } From 368a3cec10029c88dd23800e983b61998c1e9157 Mon Sep 17 00:00:00 2001 From: Alliballibaba Date: Sat, 13 Sep 2025 11:58:42 +0200 Subject: [PATCH 10/19] Adds thread pools. --- frankenphp.go | 6 +-- latencytracking.go | 41 ++++----------- phpmainthread_test.go | 12 ++--- testdata/performance/hanging-requests.js | 2 +- testdata/performance/k6.Caddyfile | 13 +++-- threadpool.go | 65 ++++++++++++++++++++++++ threadregular.go | 51 +++++++------------ threadworker.go | 8 +-- worker.go | 54 +++++--------------- 9 files changed, 127 insertions(+), 125 deletions(-) create mode 100644 threadpool.go diff --git a/frankenphp.go b/frankenphp.go index ef3789bf0..aa015aed5 100644 --- a/frankenphp.go +++ b/frankenphp.go @@ -268,11 +268,7 @@ func Init(options ...Option) error { return err } - regularRequestChan = make(chan *frankenPHPContext, totalThreadCount-workerThreadCount) - regularThreads = make([]*phpThread, 0, totalThreadCount-workerThreadCount) - for i := 0; i < totalThreadCount-workerThreadCount; i++ { - convertToRegularThread(getInactivePHPThread()) - } + initRegularPHPThreads(totalThreadCount - workerThreadCount) if err := initWorkers(opt.workers); err != nil { return err diff --git a/latencytracking.go b/latencytracking.go index 4580705fd..cb1102f69 100644 --- a/latencytracking.go +++ b/latencytracking.go @@ -1,7 +1,6 @@ package frankenphp import ( - "math/rand" "regexp" "strings" "sync" @@ -19,7 +18,7 @@ var ( // requests taking longer than this are considered slow (var for tests) slowRequestThreshold = 1000 * time.Millisecond // % of autoscaled threads that are not marked as low latency (var for tests) - slowThreadPercentile = 80 + slowThreadPercentile = 40 latencyTrackingEnabled = atomic.Bool{} slowRequestsMu = sync.RWMutex{} @@ -53,32 +52,6 @@ func isNearThreadLimit() bool { return len(autoScaledThreads) >= cap(autoScaledThreads)*slowThreadPercentile/100 } -// get a random thread that is not marked as low latency -func getRandomSlowThread(threads []*phpThread) *phpThread { - slowThreadCount := 0 - for _, thread := range threads { - if !thread.isLowLatencyThread { - slowThreadCount++ - } - } - - if slowThreadCount == 0 { - panic("there must always be at least one slow thread") - } - - slowThreadNum := rand.Intn(slowThreadCount) - for _, thread := range threads { - if !thread.isLowLatencyThread { - if slowThreadNum == 0 { - return thread - } - slowThreadNum-- - } - } - - panic("there must always be at least one slow thread") -} - // record a slow request path func trackRequestLatency(fc *frankenPHPContext, duration time.Duration, forceTracking bool) { if duration < slowRequestThreshold && !forceTracking { @@ -98,13 +71,21 @@ func trackRequestLatency(fc *frankenPHPContext, duration time.Duration, forceTra // record the latency as a moving average recordedLatency := slowRequestPaths[normalizedPath] slowRequestPaths[normalizedPath] = duration/2 + recordedLatency/2 + + // remove the path if it is no longer considered slow + if forceTracking && slowRequestPaths[normalizedPath] < slowRequestThreshold { + delete(slowRequestPaths, normalizedPath) + } slowRequestsMu.Unlock() } // determine if a request is likely to be high latency based on the request path func isHighLatencyRequest(fc *frankenPHPContext) bool { - request := fc.getOriginalRequest() - normalizedPath := normalizePath(request.URL.Path) + if len(slowRequestPaths) == 0 { + return false + } + + normalizedPath := normalizePath(fc.getOriginalRequest().URL.Path) slowRequestsMu.RLock() latency, exists := slowRequestPaths[normalizedPath] diff --git a/phpmainthread_test.go b/phpmainthread_test.go index d5e801422..fd69dbc2f 100644 --- a/phpmainthread_test.go +++ b/phpmainthread_test.go @@ -45,12 +45,12 @@ func TestTransitionRegularThreadToWorkerThread(t *testing.T) { worker := getDummyWorker("transition-worker-1.php") convertToWorkerThread(phpThreads[0], worker) assert.IsType(t, &workerThread{}, phpThreads[0].handler) - assert.Len(t, worker.threads, 1) + assert.Len(t, worker.threadPool.threads, 1) // transition back to inactive thread convertToInactiveThread(phpThreads[0]) assert.IsType(t, &inactiveThread{}, phpThreads[0].handler) - assert.Len(t, worker.threads, 0) + assert.Len(t, worker.threadPool.threads, 0) drainPHPThreads() assert.Nil(t, phpThreads) @@ -68,15 +68,15 @@ func TestTransitionAThreadBetween2DifferentWorkers(t *testing.T) { convertToWorkerThread(phpThreads[0], firstWorker) firstHandler := phpThreads[0].handler.(*workerThread) assert.Same(t, firstWorker, firstHandler.worker) - assert.Len(t, firstWorker.threads, 1) - assert.Len(t, secondWorker.threads, 0) + assert.Len(t, firstWorker.threadPool.threads, 1) + assert.Len(t, secondWorker.threadPool.threads, 0) // convert to second worker thread convertToWorkerThread(phpThreads[0], secondWorker) secondHandler := phpThreads[0].handler.(*workerThread) assert.Same(t, secondWorker, secondHandler.worker) - assert.Len(t, firstWorker.threads, 0) - assert.Len(t, secondWorker.threads, 1) + assert.Len(t, firstWorker.threadPool.threads, 0) + assert.Len(t, secondWorker.threadPool.threads, 1) drainPHPThreads() assert.Nil(t, phpThreads) diff --git a/testdata/performance/hanging-requests.js b/testdata/performance/hanging-requests.js index 0346e2b31..4c772cc0a 100644 --- a/testdata/performance/hanging-requests.js +++ b/testdata/performance/hanging-requests.js @@ -19,7 +19,7 @@ export const options = { export default function () { // 2% chance for a request that hangs for 15s if (Math.random() < 0.02) { - http.get(`${__ENV.CADDY_HOSTNAME}/sleep.php/slow-path?sleep=15000&work=10000&output=100`) + http.get(`${__ENV.CADDY_HOSTNAME}/slowpath/slow-path?sleep=15000&work=10000&output=100`) return } diff --git a/testdata/performance/k6.Caddyfile b/testdata/performance/k6.Caddyfile index 866bd6619..ac19a990f 100644 --- a/testdata/performance/k6.Caddyfile +++ b/testdata/performance/k6.Caddyfile @@ -2,18 +2,23 @@ frankenphp { max_threads {$MAX_THREADS} num_threads {$NUM_THREADS} - worker { - file /go/src/app/testdata/{$WORKER_FILE:sleep.php} - num {$WORKER_THREADS} + max_wait_time 30s + + php_ini { + max_execution_time 30 } } } :80 { route { - root /go/src/app/testdata php { root /go/src/app/testdata + worker { + file /go/src/app/testdata/{$WORKER_FILE:sleep.php} + num {$WORKER_THREADS} + match * + } } } } diff --git a/threadpool.go b/threadpool.go new file mode 100644 index 000000000..162f5f792 --- /dev/null +++ b/threadpool.go @@ -0,0 +1,65 @@ +package frankenphp + +import ( + "math/rand/v2" + "sync" +) + +// TODO: dynamic splitting? +type threadPool struct { + threads []*phpThread + slowThreads []*phpThread + mu sync.RWMutex + ch chan *frankenPHPContext +} + +func newThreadPool(capacity int) *threadPool { + return &threadPool{ + threads: make([]*phpThread, 0, capacity), + mu: sync.RWMutex{}, + ch: make(chan *frankenPHPContext), + } +} + +func (p *threadPool) attach(thread *phpThread) { + p.mu.Lock() + p.threads = append(p.threads, thread) + if !thread.isLowLatencyThread { + p.slowThreads = append(p.slowThreads, thread) + } + p.mu.Unlock() +} + +func (p *threadPool) detach(thread *phpThread) { + p.mu.Lock() + for i, t := range p.threads { + if t == thread { + p.threads = append(p.threads[:i], p.threads[i+1:]...) + break + } + } + if !thread.isLowLatencyThread { + for i, t := range p.slowThreads { + if t == thread { + p.slowThreads = append(p.slowThreads[:i], p.slowThreads[i+1:]...) + break + } + } + } + p.mu.Unlock() +} + +func (p *threadPool) getRandomSlowThread() *phpThread { + p.mu.RLock() + thread := p.slowThreads[rand.IntN(len(p.slowThreads))] + p.mu.RUnlock() + + return thread +} + +func (p *threadPool) len() int { + p.mu.RLock() + l := len(p.threads) + p.mu.RUnlock() + return l +} diff --git a/threadregular.go b/threadregular.go index 8fda6891a..bd36897b1 100644 --- a/threadregular.go +++ b/threadregular.go @@ -1,7 +1,6 @@ package frankenphp import ( - "sync" "time" ) @@ -15,24 +14,29 @@ type regularThread struct { } var ( - regularThreads []*phpThread - regularThreadMu = sync.RWMutex{} - regularRequestChan chan *frankenPHPContext + regularThreadPool *threadPool ) +func initRegularPHPThreads(num int) { + regularThreadPool = newThreadPool(num) + for i := 0; i < num; i++ { + convertToRegularThread(getInactivePHPThread()) + } +} + func convertToRegularThread(thread *phpThread) { thread.setHandler(®ularThread{ thread: thread, state: thread.state, }) - attachRegularThread(thread) + regularThreadPool.attach(thread) } // beforeScriptExecution returns the name of the script or an empty string on shutdown func (handler *regularThread) beforeScriptExecution() string { switch handler.state.get() { case stateTransitionRequested: - detachRegularThread(handler.thread) + regularThreadPool.detach(handler.thread) return handler.thread.transitionToNewHandler() case stateTransitionComplete: handler.state.set(stateReady) @@ -40,7 +44,7 @@ func (handler *regularThread) beforeScriptExecution() string { case stateReady: return handler.waitForRequest() case stateShuttingDown: - detachRegularThread(handler.thread) + regularThreadPool.detach(handler.thread) // signal to stop return "" } @@ -71,7 +75,7 @@ func (handler *regularThread) waitForRequest() string { // go back to beforeScriptExecution return handler.beforeScriptExecution() case fc = <-thread.requestChan: - case fc = <-regularRequestChan: + case fc = <-regularThreadPool.ch: } handler.requestContext = fc @@ -90,19 +94,17 @@ func handleRequestWithRegularPHPThreads(fc *frankenPHPContext) { metrics.StartRequest() if latencyTrackingEnabled.Load() && isHighLatencyRequest(fc) { - regularThreadMu.RLock() - slowThread := getRandomSlowThread(regularThreads) - regularThreadMu.RUnlock() + slowThread := regularThreadPool.getRandomSlowThread() stallRegularPHPRequests(fc, slowThread.requestChan, true) return } - regularThreadMu.RLock() - for _, thread := range regularThreads { + regularThreadPool.mu.RLock() + for _, thread := range regularThreadPool.threads { select { case thread.requestChan <- fc: - regularThreadMu.RUnlock() + regularThreadPool.mu.RUnlock() <-fc.done metrics.StopRequest() trackRequestLatency(fc, time.Since(fc.startedAt), false) @@ -112,10 +114,10 @@ func handleRequestWithRegularPHPThreads(fc *frankenPHPContext) { // thread is busy, continue } } - regularThreadMu.RUnlock() + regularThreadPool.mu.RUnlock() // if no thread was available, mark the request as queued and fan it out to all threads - stallRegularPHPRequests(fc, regularRequestChan, false) + stallRegularPHPRequests(fc, regularThreadPool.ch, false) } // stall the request and trigger scaling or timeouts @@ -140,20 +142,3 @@ func stallRegularPHPRequests(fc *frankenPHPContext, requestChan chan *frankenPHP } } } - -func attachRegularThread(thread *phpThread) { - regularThreadMu.Lock() - regularThreads = append(regularThreads, thread) - regularThreadMu.Unlock() -} - -func detachRegularThread(thread *phpThread) { - regularThreadMu.Lock() - for i, t := range regularThreads { - if t == thread { - regularThreads = append(regularThreads[:i], regularThreads[i+1:]...) - break - } - } - regularThreadMu.Unlock() -} diff --git a/threadworker.go b/threadworker.go index b7dc82036..dbdbb75ce 100644 --- a/threadworker.go +++ b/threadworker.go @@ -33,14 +33,14 @@ func convertToWorkerThread(thread *phpThread, worker *worker) { maxConsecutiveFailures: worker.maxConsecutiveFailures, }, }) - worker.attachThread(thread) + worker.threadPool.attach(thread) } // beforeScriptExecution returns the name of the script or an empty string on shutdown func (handler *workerThread) beforeScriptExecution() string { switch handler.state.get() { case stateTransitionRequested: - handler.worker.detachThread(handler.thread) + handler.worker.threadPool.detach(handler.thread) return handler.thread.transitionToNewHandler() case stateRestarting: handler.state.set(stateYielding) @@ -50,7 +50,7 @@ func (handler *workerThread) beforeScriptExecution() string { setupWorkerScript(handler, handler.worker) return handler.worker.fileName case stateShuttingDown: - handler.worker.detachThread(handler.thread) + handler.worker.threadPool.detach(handler.thread) // signal to stop return "" } @@ -181,7 +181,7 @@ func (handler *workerThread) waitForWorkerRequest() bool { return false case fc = <-handler.thread.requestChan: - case fc = <-handler.worker.requestChan: + case fc = <-handler.worker.threadPool.ch: } handler.workerContext = fc diff --git a/worker.go b/worker.go index 17fe0f063..501f5f864 100644 --- a/worker.go +++ b/worker.go @@ -18,9 +18,7 @@ type worker struct { fileName string num int env PreparedEnv - requestChan chan *frankenPHPContext - threads []*phpThread - threadMutex sync.RWMutex + threadPool *threadPool allowPathMatching bool maxConsecutiveFailures int } @@ -121,8 +119,7 @@ func newWorker(o workerOpt) (*worker, error) { fileName: absFileName, num: o.num, env: o.env, - requestChan: make(chan *frankenPHPContext), - threads: make([]*phpThread, 0, o.num), + threadPool: newThreadPool(o.num), allowPathMatching: allowPathMatching, maxConsecutiveFailures: o.maxConsecutiveFailures, } @@ -139,9 +136,9 @@ func drainWorkerThreads() []*phpThread { ready := sync.WaitGroup{} drainedThreads := make([]*phpThread, 0) for _, worker := range workers { - worker.threadMutex.RLock() - ready.Add(len(worker.threads)) - for _, thread := range worker.threads { + worker.threadPool.mu.RLock() + ready.Add(len(worker.threadPool.threads)) + for _, thread := range worker.threadPool.threads { if !thread.state.requestSafeStateChange(stateRestarting) { // no state change allowed == thread is shutting down // we'll proceed to restart all other threads anyways @@ -154,7 +151,7 @@ func drainWorkerThreads() []*phpThread { ready.Done() }(thread) } - worker.threadMutex.RUnlock() + worker.threadPool.mu.RUnlock() } ready.Wait() @@ -189,51 +186,24 @@ func getDirectoriesToWatch(workerOpts []workerOpt) []string { return directoriesToWatch } -func (worker *worker) attachThread(thread *phpThread) { - worker.threadMutex.Lock() - worker.threads = append(worker.threads, thread) - worker.threadMutex.Unlock() -} - -func (worker *worker) detachThread(thread *phpThread) { - worker.threadMutex.Lock() - for i, t := range worker.threads { - if t == thread { - worker.threads = append(worker.threads[:i], worker.threads[i+1:]...) - break - } - } - worker.threadMutex.Unlock() -} - -func (worker *worker) countThreads() int { - worker.threadMutex.RLock() - l := len(worker.threads) - worker.threadMutex.RUnlock() - - return l -} - func (worker *worker) handleRequest(fc *frankenPHPContext) { metrics.StartWorkerRequest(worker.name) // if the server is experiencing high latency, dispatch requests that are // expected to be slow directly to a number of restricted threads if latencyTrackingEnabled.Load() && isHighLatencyRequest(fc) { - worker.threadMutex.RLock() - slowThread := getRandomSlowThread(worker.threads) - worker.threadMutex.RUnlock() + slowThread := worker.threadPool.getRandomSlowThread() worker.stallRequest(fc, slowThread.requestChan, true) return } // dispatch requests to all worker threads in order - worker.threadMutex.RLock() - for _, thread := range worker.threads { + worker.threadPool.mu.RLock() + for _, thread := range worker.threadPool.threads { select { case thread.requestChan <- fc: - worker.threadMutex.RUnlock() + worker.threadPool.mu.RUnlock() <-fc.done requestTime := time.Since(fc.startedAt) @@ -245,10 +215,10 @@ func (worker *worker) handleRequest(fc *frankenPHPContext) { // thread is busy, continue } } - worker.threadMutex.RUnlock() + worker.threadPool.mu.RUnlock() // if no thread was available, mark the request as queued and apply the scaling strategy - worker.stallRequest(fc, worker.requestChan, false) + worker.stallRequest(fc, worker.threadPool.ch, false) } // stall the request and trigger scaling or timeouts From fa80d40fdb9ac96dc8cd47b0f93c214ade3db67e Mon Sep 17 00:00:00 2001 From: Alliballibaba Date: Sun, 14 Sep 2025 12:09:45 +0200 Subject: [PATCH 11/19] Adds gitignore for perf tests. --- testdata/performance/.gitignore | 1 + 1 file changed, 1 insertion(+) create mode 100644 testdata/performance/.gitignore diff --git a/testdata/performance/.gitignore b/testdata/performance/.gitignore new file mode 100644 index 000000000..b748d9b64 --- /dev/null +++ b/testdata/performance/.gitignore @@ -0,0 +1 @@ +flamegraph.sh \ No newline at end of file From 77a29f0ea1e2c8d7aabd4602fa4118f96f14034c Mon Sep 17 00:00:00 2001 From: Alliballibaba Date: Sun, 14 Sep 2025 12:11:42 +0200 Subject: [PATCH 12/19] Refactors request dispatching. --- frankenphp.go | 8 ----- threadpool.go | 94 +++++++++++++++++++++++++++++++++--------------- threadregular.go | 62 ++++++++++---------------------- threadworker.go | 13 +++---- worker.go | 69 +++++++++++------------------------ 5 files changed, 112 insertions(+), 134 deletions(-) diff --git a/frankenphp.go b/frankenphp.go index aa015aed5..40f46ef3e 100644 --- a/frankenphp.go +++ b/frankenphp.go @@ -601,11 +601,3 @@ func freeArgs(argv []*C.char) { C.free(unsafe.Pointer(arg)) } } - -func timeoutChan(timeout time.Duration) <-chan time.Time { - if timeout == 0 { - return nil - } - - return time.After(timeout) -} diff --git a/threadpool.go b/threadpool.go index 162f5f792..37424d7ba 100644 --- a/threadpool.go +++ b/threadpool.go @@ -1,65 +1,103 @@ package frankenphp import ( - "math/rand/v2" "sync" + "time" ) -// TODO: dynamic splitting? type threadPool struct { - threads []*phpThread - slowThreads []*phpThread - mu sync.RWMutex - ch chan *frankenPHPContext + threads []*phpThread + mu sync.RWMutex + ch chan *frankenPHPContext + fastChan chan *frankenPHPContext } func newThreadPool(capacity int) *threadPool { return &threadPool{ - threads: make([]*phpThread, 0, capacity), - mu: sync.RWMutex{}, - ch: make(chan *frankenPHPContext), + threads: make([]*phpThread, 0, capacity), + mu: sync.RWMutex{}, + ch: make(chan *frankenPHPContext), + fastChan: make(chan *frankenPHPContext), } } func (p *threadPool) attach(thread *phpThread) { p.mu.Lock() p.threads = append(p.threads, thread) - if !thread.isLowLatencyThread { - p.slowThreads = append(p.slowThreads, thread) - } p.mu.Unlock() } func (p *threadPool) detach(thread *phpThread) { p.mu.Lock() - for i, t := range p.threads { - if t == thread { + for i := len(p.threads) - 1; i >= 0; i-- { + if thread == p.threads[i] { p.threads = append(p.threads[:i], p.threads[i+1:]...) break } } - if !thread.isLowLatencyThread { - for i, t := range p.slowThreads { - if t == thread { - p.slowThreads = append(p.slowThreads[:i], p.slowThreads[i+1:]...) - break - } - } - } p.mu.Unlock() } -func (p *threadPool) getRandomSlowThread() *phpThread { +func (p *threadPool) len() int { p.mu.RLock() - thread := p.slowThreads[rand.IntN(len(p.slowThreads))] + l := len(p.threads) p.mu.RUnlock() + return l +} - return thread +// get the correct request chan for queued requests +func (p *threadPool) requestChan(thread *phpThread) chan *frankenPHPContext { + if thread.isLowLatencyThread { + return p.fastChan + } + return p.ch } -func (p *threadPool) len() int { +// dispatch to all threads in order if any is available +// dispatching in order minimizes memory usage and external connections +func (p *threadPool) dispatchRequest(fc *frankenPHPContext) bool { p.mu.RLock() - l := len(p.threads) + for _, thread := range p.threads { + select { + case thread.requestChan <- fc: + p.mu.RUnlock() + return true + default: + // thread is busy, continue + } + } p.mu.RUnlock() - return l + + return false +} + +// dispatch request to all threads, triggering scaling or timeouts as needed +func (p *threadPool) queueRequest(fc *frankenPHPContext, isFastRequest bool) bool { + var fastChan chan *frankenPHPContext + if isFastRequest { + fastChan = p.fastChan + } + + for { + select { + case p.ch <- fc: + return true + case fastChan <- fc: + return true + case scaleChan <- fc: + // the request has triggered scaling, continue to wait for a thread + case <-timeoutChan(maxWaitTime): + // the request has timed out stalling + fc.reject(504, "Gateway Timeout") + return false + } + } +} + +func timeoutChan(timeout time.Duration) <-chan time.Time { + if timeout == 0 { + return nil + } + + return time.After(timeout) } diff --git a/threadregular.go b/threadregular.go index bd36897b1..61571f6cf 100644 --- a/threadregular.go +++ b/threadregular.go @@ -75,7 +75,7 @@ func (handler *regularThread) waitForRequest() string { // go back to beforeScriptExecution return handler.beforeScriptExecution() case fc = <-thread.requestChan: - case fc = <-regularThreadPool.ch: + case fc = <-regularThreadPool.requestChan(thread): } handler.requestContext = fc @@ -93,52 +93,28 @@ func (handler *regularThread) afterRequest() { func handleRequestWithRegularPHPThreads(fc *frankenPHPContext) { metrics.StartRequest() - if latencyTrackingEnabled.Load() && isHighLatencyRequest(fc) { - slowThread := regularThreadPool.getRandomSlowThread() - stallRegularPHPRequests(fc, slowThread.requestChan, true) + trackLatency := latencyTrackingEnabled.Load() + isSlowRequest := trackLatency && isHighLatencyRequest(fc) - return - } + // dispatch requests to all regular threads in order + if !isSlowRequest && regularThreadPool.dispatchRequest(fc) { + <-fc.done + metrics.StopRequest() + trackRequestLatency(fc, time.Since(fc.startedAt), false) - regularThreadPool.mu.RLock() - for _, thread := range regularThreadPool.threads { - select { - case thread.requestChan <- fc: - regularThreadPool.mu.RUnlock() - <-fc.done - metrics.StopRequest() - trackRequestLatency(fc, time.Since(fc.startedAt), false) - - return - default: - // thread is busy, continue - } + return } - regularThreadPool.mu.RUnlock() - - // if no thread was available, mark the request as queued and fan it out to all threads - stallRegularPHPRequests(fc, regularThreadPool.ch, false) -} -// stall the request and trigger scaling or timeouts -func stallRegularPHPRequests(fc *frankenPHPContext, requestChan chan *frankenPHPContext, forceTracking bool) { metrics.QueuedRequest() - for { - select { - case requestChan <- fc: - metrics.DequeuedRequest() - stallTime := time.Since(fc.startedAt) - <-fc.done - metrics.StopRequest() - trackRequestLatency(fc, time.Since(fc.startedAt)-stallTime, forceTracking) - return - case scaleChan <- fc: - // the request has triggered scaling, continue to wait for a thread - case <-timeoutChan(maxWaitTime): - // the request has timed out stalling - metrics.DequeuedRequest() - fc.reject(504, "Gateway Timeout") - return - } + requestWasReceived := regularThreadPool.queueRequest(fc, trackLatency && !isSlowRequest) + metrics.DequeuedRequest() + + if !requestWasReceived { + return } + + stallTime := time.Since(fc.startedAt) + <-fc.done + metrics.StopRequest() + trackRequestLatency(fc, time.Since(fc.startedAt)-stallTime, isSlowRequest) } diff --git a/threadworker.go b/threadworker.go index dbdbb75ce..ff1ce5908 100644 --- a/threadworker.go +++ b/threadworker.go @@ -145,7 +145,8 @@ func tearDownWorkerScript(handler *workerThread, exitStatus int) { // waitForWorkerRequest is called during frankenphp_handle_request in the php worker script. func (handler *workerThread) waitForWorkerRequest() bool { // unpin any memory left over from previous requests - handler.thread.Unpin() + thread := handler.thread + thread.Unpin() ctx := context.Background() logger.LogAttrs(ctx, slog.LevelDebug, "waiting for request", slog.String("worker", handler.worker.name), slog.Int("thread", handler.thread.threadIndex)) @@ -170,8 +171,8 @@ func (handler *workerThread) waitForWorkerRequest() bool { var fc *frankenPHPContext select { - case <-handler.thread.drainChan: - logger.LogAttrs(ctx, slog.LevelDebug, "shutting down", slog.String("worker", handler.worker.name), slog.Int("thread", handler.thread.threadIndex)) + case <-thread.drainChan: + logger.LogAttrs(ctx, slog.LevelDebug, "shutting down", slog.String("worker", handler.worker.name), slog.Int("thread", thread.threadIndex)) // flush the opcache when restarting due to watcher or admin api // note: this is done right before frankenphp_handle_request() returns 'false' @@ -180,14 +181,14 @@ func (handler *workerThread) waitForWorkerRequest() bool { } return false - case fc = <-handler.thread.requestChan: - case fc = <-handler.worker.threadPool.ch: + case fc = <-thread.requestChan: + case fc = <-handler.worker.threadPool.requestChan(thread): } handler.workerContext = fc handler.state.markAsWaiting(false) - logger.LogAttrs(ctx, slog.LevelDebug, "request handling started", slog.String("worker", handler.worker.name), slog.Int("thread", handler.thread.threadIndex), slog.String("url", fc.request.RequestURI)) + logger.LogAttrs(ctx, slog.LevelDebug, "request handling started", slog.String("worker", handler.worker.name), slog.Int("thread", thread.threadIndex), slog.String("url", fc.request.RequestURI)) return true } diff --git a/worker.go b/worker.go index 501f5f864..d5bfd80d5 100644 --- a/worker.go +++ b/worker.go @@ -189,59 +189,30 @@ func getDirectoriesToWatch(workerOpts []workerOpt) []string { func (worker *worker) handleRequest(fc *frankenPHPContext) { metrics.StartWorkerRequest(worker.name) - // if the server is experiencing high latency, dispatch requests that are - // expected to be slow directly to a number of restricted threads - if latencyTrackingEnabled.Load() && isHighLatencyRequest(fc) { - slowThread := worker.threadPool.getRandomSlowThread() - worker.stallRequest(fc, slowThread.requestChan, true) - - return - } + trackLatency := latencyTrackingEnabled.Load() + isSlowRequest := trackLatency && isHighLatencyRequest(fc) // dispatch requests to all worker threads in order - worker.threadPool.mu.RLock() - for _, thread := range worker.threadPool.threads { - select { - case thread.requestChan <- fc: - worker.threadPool.mu.RUnlock() - <-fc.done - - requestTime := time.Since(fc.startedAt) - metrics.StopWorkerRequest(worker.name, requestTime) - trackRequestLatency(fc, requestTime, false) - - return - default: - // thread is busy, continue - } - } - worker.threadPool.mu.RUnlock() + if !isSlowRequest && worker.threadPool.dispatchRequest(fc) { + <-fc.done + requestTime := time.Since(fc.startedAt) + metrics.StopWorkerRequest(worker.name, requestTime) + trackRequestLatency(fc, requestTime, false) - // if no thread was available, mark the request as queued and apply the scaling strategy - worker.stallRequest(fc, worker.threadPool.ch, false) -} + return + } -// stall the request and trigger scaling or timeouts -func (worker *worker) stallRequest(fc *frankenPHPContext, requestChan chan *frankenPHPContext, forceTracking bool) { metrics.QueuedWorkerRequest(worker.name) - for { - select { - case requestChan <- fc: - metrics.DequeuedWorkerRequest(worker.name) - stallDuration := time.Since(fc.startedAt) - <-fc.done - requestTime := time.Since(fc.startedAt) - metrics.StopWorkerRequest(worker.name, requestTime) - trackRequestLatency(fc, requestTime-stallDuration, forceTracking) - - return - case scaleChan <- fc: - // the request has triggered scaling, continue to wait for a thread - case <-timeoutChan(maxWaitTime): - metrics.DequeuedWorkerRequest(worker.name) - // the request has timed out stalling - fc.reject(504, "Gateway Timeout") - return - } + requestWasReceived := worker.threadPool.queueRequest(fc, trackLatency && !isSlowRequest) + metrics.DequeuedWorkerRequest(worker.name) + + if !requestWasReceived { + return } + + stallDuration := time.Since(fc.startedAt) + <-fc.done + requestTime := time.Since(fc.startedAt) + metrics.StopWorkerRequest(worker.name, requestTime) + trackRequestLatency(fc, requestTime-stallDuration, isSlowRequest) } From e50593a11d347796deede3c5af0109a53927e65c Mon Sep 17 00:00:00 2001 From: Alliballibaba Date: Sun, 14 Sep 2025 12:12:50 +0200 Subject: [PATCH 13/19] Fixes gitignore. --- testdata/performance/.gitignore | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testdata/performance/.gitignore b/testdata/performance/.gitignore index b748d9b64..2b3984cf9 100644 --- a/testdata/performance/.gitignore +++ b/testdata/performance/.gitignore @@ -1 +1 @@ -flamegraph.sh \ No newline at end of file +flamegraph.svg \ No newline at end of file From 57c2ed43787af94077b6bd4df86d3f1843992779 Mon Sep 17 00:00:00 2001 From: Alliballibaba Date: Sun, 14 Sep 2025 13:07:54 +0200 Subject: [PATCH 14/19] Refactoring. --- latencytracking.go | 12 +++++------ scaling.go | 6 +++--- threadpool.go | 54 ++++++++++++++++++++-------------------------- threadregular.go | 4 +--- 4 files changed, 33 insertions(+), 43 deletions(-) diff --git a/latencytracking.go b/latencytracking.go index cb1102f69..c84e77196 100644 --- a/latencytracking.go +++ b/latencytracking.go @@ -8,15 +8,15 @@ import ( "time" ) -// limit of tracked path children +// hard limit of tracked paths const maxTrackedPaths = 1000 -// path parts longer than this are considered a wildcard +// path parts longer than this are considered a slug const charLimitWildcard = 50 var ( // requests taking longer than this are considered slow (var for tests) - slowRequestThreshold = 1000 * time.Millisecond + slowRequestThreshold = 1500 * time.Millisecond // % of autoscaled threads that are not marked as low latency (var for tests) slowThreadPercentile = 40 @@ -33,7 +33,7 @@ func initLatencyTracking() { } // trigger latency tracking while scaling threads -func triggerLatencyTrackingIfNeeded(thread *phpThread) { +func triggerLatencyTracking(thread *phpThread) { if isNearThreadLimit() { latencyTrackingEnabled.Store(true) thread.isLowLatencyThread = true @@ -41,7 +41,7 @@ func triggerLatencyTrackingIfNeeded(thread *phpThread) { } } -func stopLatencyTrackingIfNeeded() { +func stopLatencyTracking() { if latencyTrackingEnabled.Load() && !isNearThreadLimit() { latencyTrackingEnabled.Store(false) logger.Debug("latency tracking disabled") @@ -98,7 +98,6 @@ func isHighLatencyRequest(fc *frankenPHPContext) bool { return false } -// TODO: query? func normalizePath(path string) string { pathLen := len(path) if pathLen > 1 && path[pathLen-1] == '/' { @@ -126,6 +125,7 @@ func normalizePath(path string) string { // determine if a path part is a wildcard func normalizePathPart(part string) string { if len(part) > charLimitWildcard { + // TODO: better slug detection? return ":slug" } diff --git a/scaling.go b/scaling.go index b96dff333..db0102f86 100644 --- a/scaling.go +++ b/scaling.go @@ -63,7 +63,7 @@ func addRegularThread() (*phpThread, error) { if thread == nil { return nil, ErrMaxThreadsReached } - triggerLatencyTrackingIfNeeded(thread) + triggerLatencyTracking(thread) convertToRegularThread(thread) thread.state.waitFor(stateReady, stateShuttingDown, stateReserved) return thread, nil @@ -74,7 +74,7 @@ func addWorkerThread(worker *worker) (*phpThread, error) { if thread == nil { return nil, ErrMaxThreadsReached } - triggerLatencyTrackingIfNeeded(thread) + triggerLatencyTracking(thread) convertToWorkerThread(thread, worker) thread.state.waitFor(stateReady, stateShuttingDown, stateReserved) return thread, nil @@ -223,5 +223,5 @@ func deactivateThreads() { // } } - stopLatencyTrackingIfNeeded() + stopLatencyTracking() } diff --git a/threadpool.go b/threadpool.go index 37424d7ba..443fb8efc 100644 --- a/threadpool.go +++ b/threadpool.go @@ -5,19 +5,21 @@ import ( "time" ) +// threadPool manages a pool of PHP threads +// used for both worker and regular threads type threadPool struct { - threads []*phpThread - mu sync.RWMutex - ch chan *frankenPHPContext - fastChan chan *frankenPHPContext + threads []*phpThread + mu sync.RWMutex + ch chan *frankenPHPContext + lowLatencyChan chan *frankenPHPContext } func newThreadPool(capacity int) *threadPool { return &threadPool{ - threads: make([]*phpThread, 0, capacity), - mu: sync.RWMutex{}, - ch: make(chan *frankenPHPContext), - fastChan: make(chan *frankenPHPContext), + threads: make([]*phpThread, 0, capacity), + mu: sync.RWMutex{}, + ch: make(chan *frankenPHPContext), + lowLatencyChan: make(chan *frankenPHPContext), } } @@ -38,17 +40,10 @@ func (p *threadPool) detach(thread *phpThread) { p.mu.Unlock() } -func (p *threadPool) len() int { - p.mu.RLock() - l := len(p.threads) - p.mu.RUnlock() - return l -} - // get the correct request chan for queued requests func (p *threadPool) requestChan(thread *phpThread) chan *frankenPHPContext { if thread.isLowLatencyThread { - return p.fastChan + return p.lowLatencyChan } return p.ch } @@ -72,32 +67,29 @@ func (p *threadPool) dispatchRequest(fc *frankenPHPContext) bool { } // dispatch request to all threads, triggering scaling or timeouts as needed -func (p *threadPool) queueRequest(fc *frankenPHPContext, isFastRequest bool) bool { - var fastChan chan *frankenPHPContext - if isFastRequest { - fastChan = p.fastChan +func (p *threadPool) queueRequest(fc *frankenPHPContext, isLowLatencyRequest bool) bool { + var lowLatencyChan chan *frankenPHPContext + if isLowLatencyRequest { + lowLatencyChan = p.lowLatencyChan + } + + var timeoutChan <-chan time.Time + if maxWaitTime > 0 { + timeoutChan = time.After(maxWaitTime) } for { select { case p.ch <- fc: return true - case fastChan <- fc: - return true + case lowLatencyChan <- fc: + return true // 'low laten' case scaleChan <- fc: // the request has triggered scaling, continue to wait for a thread - case <-timeoutChan(maxWaitTime): + case <-timeoutChan: // the request has timed out stalling fc.reject(504, "Gateway Timeout") return false } } } - -func timeoutChan(timeout time.Duration) <-chan time.Time { - if timeout == 0 { - return nil - } - - return time.After(timeout) -} diff --git a/threadregular.go b/threadregular.go index 61571f6cf..4d46c8521 100644 --- a/threadregular.go +++ b/threadregular.go @@ -13,9 +13,7 @@ type regularThread struct { requestContext *frankenPHPContext } -var ( - regularThreadPool *threadPool -) +var regularThreadPool *threadPool func initRegularPHPThreads(num int) { regularThreadPool = newThreadPool(num) From 91e28071a26a7de43a44873e0a4b31b4231b7884 Mon Sep 17 00:00:00 2001 From: Alliballibaba Date: Sun, 14 Sep 2025 16:11:56 +0200 Subject: [PATCH 15/19] Makes latency tracking optional. --- caddy/app.go | 6 +++ context.go | 3 +- frankenphp.go | 2 +- latencytracking.go | 77 ++++++++++++++++++++----------- latencytracking_test.go | 6 ++- options.go | 25 +++++++--- scaling.go | 11 +++-- testdata/performance/perf-test.sh | 8 ++-- threadpool.go | 15 +++--- threadregular.go | 7 +-- worker.go | 5 +- 11 files changed, 108 insertions(+), 57 deletions(-) diff --git a/caddy/app.go b/caddy/app.go index 01831c533..a0c785619 100644 --- a/caddy/app.go +++ b/caddy/app.go @@ -37,6 +37,8 @@ type FrankenPHPApp struct { PhpIni map[string]string `json:"php_ini,omitempty"` // The maximum amount of time a request may be stalled waiting for a thread MaxWaitTime time.Duration `json:"max_wait_time,omitempty"` + // Enable latency tracking mode (experimental) + LatencyTracking bool `json:"low_latency_tracking,omitempty"` metrics frankenphp.Metrics logger *slog.Logger @@ -117,6 +119,7 @@ func (f *FrankenPHPApp) Start() error { frankenphp.WithMetrics(f.metrics), frankenphp.WithPhpIni(f.PhpIni), frankenphp.WithMaxWaitTime(f.MaxWaitTime), + frankenphp.WithLatencyTracking(f.LatencyTracking), } for _, w := range append(f.Workers) { workerOpts := []frankenphp.WorkerOption{ @@ -253,6 +256,9 @@ func (f *FrankenPHPApp) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { } f.Workers = append(f.Workers, wc) + + case "latency_tracking": + f.LatencyTracking = true default: allowedDirectives := "num_threads, max_threads, php_ini, worker, max_wait_time" return wrongSubDirectiveError("frankenphp", allowedDirectives, d.Val()) diff --git a/context.go b/context.go index a67bf50dd..f6af45564 100644 --- a/context.go +++ b/context.go @@ -26,7 +26,8 @@ type frankenPHPContext struct { scriptFilename string // Whether the request is already closed by us - isDone bool + isDone bool + isLowLatencyRequest bool responseWriter http.ResponseWriter diff --git a/frankenphp.go b/frankenphp.go index 40f46ef3e..2d953f417 100644 --- a/frankenphp.go +++ b/frankenphp.go @@ -274,7 +274,7 @@ func Init(options ...Option) error { return err } - initLatencyTracking() + initLatencyTracking(opt.latencyTracking) initAutoScaling(mainThread) diff --git a/latencytracking.go b/latencytracking.go index c84e77196..c8e19db57 100644 --- a/latencytracking.go +++ b/latencytracking.go @@ -11,45 +11,55 @@ import ( // hard limit of tracked paths const maxTrackedPaths = 1000 -// path parts longer than this are considered a slug -const charLimitWildcard = 50 +// path parts longer than this are considered a wildcard +const maxPathPartChars = 50 + +// max amount of requests being drained when a new slow path is recorded +const maxRequestDrainage = 100 var ( + // TODO: test with different values // requests taking longer than this are considered slow (var for tests) slowRequestThreshold = 1500 * time.Millisecond - // % of autoscaled threads that are not marked as low latency (var for tests) - slowThreadPercentile = 40 + // % of initial threads that are marked as low latency threads(var for tests) + lowLatencyPercentile = 25 - latencyTrackingEnabled = atomic.Bool{} + latencyTrackingEnabled = false + latencyTrackingActive = atomic.Bool{} slowRequestsMu = sync.RWMutex{} slowRequestPaths map[string]time.Duration numRe = regexp.MustCompile(`^\d+$`) uuidRe = regexp.MustCompile(`^[a-f0-9-]{36}$`) ) -func initLatencyTracking() { - latencyTrackingEnabled.Store(false) +func initLatencyTracking(enabled bool) { + latencyTrackingActive.Store(false) slowRequestPaths = make(map[string]time.Duration) + latencyTrackingEnabled = enabled } -// trigger latency tracking while scaling threads -func triggerLatencyTracking(thread *phpThread) { - if isNearThreadLimit() { - latencyTrackingEnabled.Store(true) - thread.isLowLatencyThread = true - logger.Debug("low latency thread spawned") +func triggerLatencyTracking(thread *phpThread, threadAmount int, threadLimit int) { + if !latencyTrackingEnabled || !isCloseToThreadLimit(threadAmount, threadLimit) { + return + } + + thread.isLowLatencyThread = true + + if !latencyTrackingActive.Load() { + latencyTrackingActive.Store(true) + logger.Info("latency tracking enabled") } } -func stopLatencyTracking() { - if latencyTrackingEnabled.Load() && !isNearThreadLimit() { - latencyTrackingEnabled.Store(false) - logger.Debug("latency tracking disabled") +func stopLatencyTracking(threadAmount int, threadLimit int) { + if latencyTrackingActive.Load() && !isCloseToThreadLimit(threadAmount, threadLimit) { + latencyTrackingActive.Store(false) + logger.Info("latency tracking disabled") } } -func isNearThreadLimit() bool { - return len(autoScaledThreads) >= cap(autoScaledThreads)*slowThreadPercentile/100 +func isCloseToThreadLimit(threadAmount int, threadLimit int) bool { + return threadAmount >= threadLimit*(100-lowLatencyPercentile)/100 } // record a slow request path @@ -60,16 +70,31 @@ func trackRequestLatency(fc *frankenPHPContext, duration time.Duration, forceTra request := fc.getOriginalRequest() normalizedPath := normalizePath(request.URL.Path) + logger.Debug("slow request detected", "path", normalizedPath, "duration", duration) - slowRequestsMu.Lock() + slowRequestsMu.Lock() // if too many slow paths are tracked, clear the map if len(slowRequestPaths) > maxTrackedPaths { slowRequestPaths = make(map[string]time.Duration) } - // record the latency as a moving average recordedLatency := slowRequestPaths[normalizedPath] + if recordedLatency == 0 && latencyTrackingActive.Load() { + // a new path that is known to be slow is recorded, + // drain some requests to free up low-latency threads + for i := 0; i < maxRequestDrainage; i++ { + select { + case scaleChan <- fc: + _ = isHighLatencyRequest(fc) + default: + // no more queued requests + break + } + } + } + + // record the latency as a moving average slowRequestPaths[normalizedPath] = duration/2 + recordedLatency/2 // remove the path if it is no longer considered slow @@ -88,14 +113,12 @@ func isHighLatencyRequest(fc *frankenPHPContext) bool { normalizedPath := normalizePath(fc.getOriginalRequest().URL.Path) slowRequestsMu.RLock() - latency, exists := slowRequestPaths[normalizedPath] + latency := slowRequestPaths[normalizedPath] slowRequestsMu.RUnlock() - if exists { - return latency > slowRequestThreshold - } + fc.isLowLatencyRequest = latency < slowRequestThreshold - return false + return !fc.isLowLatencyRequest } func normalizePath(path string) string { @@ -124,7 +147,7 @@ func normalizePath(path string) string { // determine if a path part is a wildcard func normalizePathPart(part string) string { - if len(part) > charLimitWildcard { + if len(part) > maxPathPartChars { // TODO: better slug detection? return ":slug" } diff --git a/latencytracking_test.go b/latencytracking_test.go index 8a855a345..247880dd8 100644 --- a/latencytracking_test.go +++ b/latencytracking_test.go @@ -53,7 +53,8 @@ func TestTunnelLowLatencyRequest_worker(t *testing.T) { // record request path as slow, manipulate thresholds to make it easy to trigger slowRequestThreshold = 1 * time.Millisecond - slowThreadPercentile = 0 + latencyTrackingEnabled = true + lowLatencyPercentile = 100 scaleWorkerThread(getWorkerByName("worker")) // the scaled thread should be low-latency only assertGetRequest(t, "/slow/123/path?sleep=5", "slept for 5 ms", opt) assertGetRequest(t, "/slow/123/path?sleep=5", "slept for 5 ms", opt) @@ -90,7 +91,8 @@ func TestTunnelLowLatencyRequest_module(t *testing.T) { // record request path as slow, manipulate thresholds to make it easy to trigger slowRequestThreshold = 1 * time.Millisecond - slowThreadPercentile = 0 + latencyTrackingEnabled = true + lowLatencyPercentile = 100 scaleRegularThread() // the scaled thread should be low-latency only assertGetRequest(t, "/testdata/sleep.php?sleep=5", "slept for 5 ms") assertGetRequest(t, "/testdata/sleep.php?sleep=5", "slept for 5 ms") diff --git a/options.go b/options.go index 18c5ba20f..50440744d 100644 --- a/options.go +++ b/options.go @@ -19,13 +19,14 @@ type WorkerOption func(*workerOpt) error // // If you change this, also update the Caddy module and the documentation. type opt struct { - numThreads int - maxThreads int - workers []workerOpt - logger *slog.Logger - metrics Metrics - phpIni map[string]string - maxWaitTime time.Duration + numThreads int + maxThreads int + workers []workerOpt + logger *slog.Logger + metrics Metrics + phpIni map[string]string + maxWaitTime time.Duration + latencyTracking bool } type workerOpt struct { @@ -54,6 +55,16 @@ func WithMaxThreads(maxThreads int) Option { } } +// TODO: is this possible without a new configuration? +// EXPERIMENTAL: WithLowLatencyTracking enables low-latency tracking mode +func WithLatencyTracking(enabled bool) Option { + return func(o *opt) error { + o.latencyTracking = enabled + + return nil + } +} + func WithMetrics(m Metrics) Option { return func(o *opt) error { o.metrics = m diff --git a/scaling.go b/scaling.go index db0102f86..6603d7402 100644 --- a/scaling.go +++ b/scaling.go @@ -63,7 +63,7 @@ func addRegularThread() (*phpThread, error) { if thread == nil { return nil, ErrMaxThreadsReached } - triggerLatencyTracking(thread) + triggerLatencyTracking(thread, len(autoScaledThreads), cap(autoScaledThreads)) convertToRegularThread(thread) thread.state.waitFor(stateReady, stateShuttingDown, stateReserved) return thread, nil @@ -74,7 +74,7 @@ func addWorkerThread(worker *worker) (*phpThread, error) { if thread == nil { return nil, ErrMaxThreadsReached } - triggerLatencyTracking(thread) + triggerLatencyTracking(thread, len(autoScaledThreads), cap(autoScaledThreads)) convertToWorkerThread(thread, worker) thread.state.waitFor(stateReady, stateShuttingDown, stateReserved) return thread, nil @@ -201,6 +201,11 @@ func deactivateThreads() { continue } + if thread.isLowLatencyThread && len(autoScaledThreads) >= cap(autoScaledThreads) { + // do not downscale low-latency threads if we're at the thread limit + continue + } + // convert threads to inactive if they have been idle for too long if thread.state.is(stateReady) && waitTime > maxThreadIdleTime.Milliseconds() { convertToInactiveThread(thread) @@ -223,5 +228,5 @@ func deactivateThreads() { // } } - stopLatencyTracking() + stopLatencyTracking(len(autoScaledThreads), cap(autoScaledThreads)) } diff --git a/testdata/performance/perf-test.sh b/testdata/performance/perf-test.sh index 056e8d3d2..8aed36a73 100755 --- a/testdata/performance/perf-test.sh +++ b/testdata/performance/perf-test.sh @@ -7,8 +7,11 @@ docker build -t frankenphp-dev -f dev.Dockerfile . export "CADDY_HOSTNAME=http://host.docker.internal" select filename in ./testdata/performance/*.js; do - read -r -p "How many worker threads? " workerThreads - read -r -p "How many max threads? " maxThreads + read -r -p "How many worker threads (40)? " workerThreads + read -r -p "How many max threads (80)? " maxThreads + + workerThreads=${workerThreads:-40} + maxThreads=${maxThreads:-80} numThreads=$((workerThreads + 1)) @@ -36,5 +39,4 @@ select filename in ./testdata/performance/*.js; do docker exec load-test-container curl "http://localhost:2019/frankenphp/threads" docker stop load-test-container - docker rm load-test-container done diff --git a/threadpool.go b/threadpool.go index 443fb8efc..d31d4ee01 100644 --- a/threadpool.go +++ b/threadpool.go @@ -67,23 +67,24 @@ func (p *threadPool) dispatchRequest(fc *frankenPHPContext) bool { } // dispatch request to all threads, triggering scaling or timeouts as needed -func (p *threadPool) queueRequest(fc *frankenPHPContext, isLowLatencyRequest bool) bool { - var lowLatencyChan chan *frankenPHPContext - if isLowLatencyRequest { - lowLatencyChan = p.lowLatencyChan - } - +func (p *threadPool) queueRequest(fc *frankenPHPContext) bool { var timeoutChan <-chan time.Time if maxWaitTime > 0 { timeoutChan = time.After(maxWaitTime) } for { + var lowLatencyChan chan *frankenPHPContext + if fc.isLowLatencyRequest { + lowLatencyChan = p.lowLatencyChan + } + select { case p.ch <- fc: return true case lowLatencyChan <- fc: - return true // 'low laten' + // only low-latency requests can be sent to low-latency threads + return true case scaleChan <- fc: // the request has triggered scaling, continue to wait for a thread case <-timeoutChan: diff --git a/threadregular.go b/threadregular.go index 4d46c8521..4ac0354de 100644 --- a/threadregular.go +++ b/threadregular.go @@ -18,7 +18,8 @@ var regularThreadPool *threadPool func initRegularPHPThreads(num int) { regularThreadPool = newThreadPool(num) for i := 0; i < num; i++ { - convertToRegularThread(getInactivePHPThread()) + thread := getInactivePHPThread() + convertToRegularThread(thread) } } @@ -91,7 +92,7 @@ func (handler *regularThread) afterRequest() { func handleRequestWithRegularPHPThreads(fc *frankenPHPContext) { metrics.StartRequest() - trackLatency := latencyTrackingEnabled.Load() + trackLatency := latencyTrackingActive.Load() isSlowRequest := trackLatency && isHighLatencyRequest(fc) // dispatch requests to all regular threads in order @@ -104,7 +105,7 @@ func handleRequestWithRegularPHPThreads(fc *frankenPHPContext) { } metrics.QueuedRequest() - requestWasReceived := regularThreadPool.queueRequest(fc, trackLatency && !isSlowRequest) + requestWasReceived := regularThreadPool.queueRequest(fc) metrics.DequeuedRequest() if !requestWasReceived { diff --git a/worker.go b/worker.go index d5bfd80d5..e3b10c0ad 100644 --- a/worker.go +++ b/worker.go @@ -189,8 +189,7 @@ func getDirectoriesToWatch(workerOpts []workerOpt) []string { func (worker *worker) handleRequest(fc *frankenPHPContext) { metrics.StartWorkerRequest(worker.name) - trackLatency := latencyTrackingEnabled.Load() - isSlowRequest := trackLatency && isHighLatencyRequest(fc) + isSlowRequest := latencyTrackingActive.Load() && isHighLatencyRequest(fc) // dispatch requests to all worker threads in order if !isSlowRequest && worker.threadPool.dispatchRequest(fc) { @@ -203,7 +202,7 @@ func (worker *worker) handleRequest(fc *frankenPHPContext) { } metrics.QueuedWorkerRequest(worker.name) - requestWasReceived := worker.threadPool.queueRequest(fc, trackLatency && !isSlowRequest) + requestWasReceived := worker.threadPool.queueRequest(fc) metrics.DequeuedWorkerRequest(worker.name) if !requestWasReceived { From 7a1b076d0a9eb0e06825de7d929c38ff21ee5554 Mon Sep 17 00:00:00 2001 From: Alliballibaba Date: Sun, 14 Sep 2025 16:52:52 +0200 Subject: [PATCH 16/19] Adds latency tracking test. --- testdata/performance/extreme-hanging.js | 26 +++++++++++++++++++++++++ testdata/performance/k6.Caddyfile | 1 + 2 files changed, 27 insertions(+) create mode 100644 testdata/performance/extreme-hanging.js diff --git a/testdata/performance/extreme-hanging.js b/testdata/performance/extreme-hanging.js new file mode 100644 index 000000000..6dfae5041 --- /dev/null +++ b/testdata/performance/extreme-hanging.js @@ -0,0 +1,26 @@ +import http from 'k6/http' + +/** + * It is not uncommon for external services to hang for a long time. + * Make sure the server is resilient in such cases and doesn't hang as well. + */ +export const options = { + stages: [ + { duration: '20s', target: 100 }, + { duration: '20s', target: 500 }, + { duration: '20s', target: 0 } + ], + thresholds: { + http_req_failed: ['rate<0.5'] + } +} + +export default function () { + if (__VU % 2 === 0) { + // 50 % of VUs cause hanging + http.get(`${__ENV.CADDY_HOSTNAME}/slow-path?sleep=15000&work=10000&output=100`) + } else { + // The other VUs do very fast requests + http.get(`${__ENV.CADDY_HOSTNAME}/sleep.php?sleep=3&work=1000`) + } +} diff --git a/testdata/performance/k6.Caddyfile b/testdata/performance/k6.Caddyfile index ac19a990f..9363ace92 100644 --- a/testdata/performance/k6.Caddyfile +++ b/testdata/performance/k6.Caddyfile @@ -3,6 +3,7 @@ max_threads {$MAX_THREADS} num_threads {$NUM_THREADS} max_wait_time 30s + latency_tracking php_ini { max_execution_time 30 From b35f93002360ee74f5dddb08d998b82a1b74c8fc Mon Sep 17 00:00:00 2001 From: Alliballibaba Date: Sun, 14 Sep 2025 17:02:29 +0200 Subject: [PATCH 17/19] linting. --- testdata/performance/extreme-hanging.js | 8 +++++--- testdata/performance/hanging-requests.js | 7 +++++-- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/testdata/performance/extreme-hanging.js b/testdata/performance/extreme-hanging.js index 6dfae5041..95467a52a 100644 --- a/testdata/performance/extreme-hanging.js +++ b/testdata/performance/extreme-hanging.js @@ -15,10 +15,12 @@ export const options = { } } +/* global __VU */ +/* global __ENV */ export default function () { - if (__VU % 2 === 0) { - // 50 % of VUs cause hanging - http.get(`${__ENV.CADDY_HOSTNAME}/slow-path?sleep=15000&work=10000&output=100`) + if (__VU % 50 === 0) { + // 50 % of VUs cause extreme hanging + http.get(`${__ENV.CADDY_HOSTNAME}/slow-path?sleep=60000&work=10000&output=100`) } else { // The other VUs do very fast requests http.get(`${__ENV.CADDY_HOSTNAME}/sleep.php?sleep=3&work=1000`) diff --git a/testdata/performance/hanging-requests.js b/testdata/performance/hanging-requests.js index 4c772cc0a..ebc165136 100644 --- a/testdata/performance/hanging-requests.js +++ b/testdata/performance/hanging-requests.js @@ -11,7 +11,7 @@ export const options = { { duration: '20s', target: 0 } ], thresholds: { - http_req_failed: ['rate<0.01'] + http_req_failed: ['rate<0.5'] } } @@ -19,7 +19,10 @@ export const options = { export default function () { // 2% chance for a request that hangs for 15s if (Math.random() < 0.02) { - http.get(`${__ENV.CADDY_HOSTNAME}/slowpath/slow-path?sleep=15000&work=10000&output=100`) + http.get(`${__ENV.CADDY_HOSTNAME}/slow-path?sleep=15000&work=10000&output=100`, { + timeout: 500, // do not wait and continue with the next request + throw: false, + }) return } From 51014177f5aafe26454706838256d71373c8c6aa Mon Sep 17 00:00:00 2001 From: Alliballibaba Date: Sun, 14 Sep 2025 17:08:50 +0200 Subject: [PATCH 18/19] Fixes break statement. --- latencytracking.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/latencytracking.go b/latencytracking.go index c8e19db57..ba00a4d32 100644 --- a/latencytracking.go +++ b/latencytracking.go @@ -83,13 +83,15 @@ func trackRequestLatency(fc *frankenPHPContext, duration time.Duration, forceTra if recordedLatency == 0 && latencyTrackingActive.Load() { // a new path that is known to be slow is recorded, // drain some requests to free up low-latency threads + out: for i := 0; i < maxRequestDrainage; i++ { select { case scaleChan <- fc: _ = isHighLatencyRequest(fc) default: // no more queued requests - break + //break outer loop + break out } } } From db76068dd2d2424a49cb4e6301c10c2d6faee4fe Mon Sep 17 00:00:00 2001 From: Alliballibaba Date: Sun, 14 Sep 2025 21:21:20 +0200 Subject: [PATCH 19/19] Adjusts comments. --- latencytracking.go | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/latencytracking.go b/latencytracking.go index ba00a4d32..5d55f7f6a 100644 --- a/latencytracking.go +++ b/latencytracking.go @@ -18,16 +18,15 @@ const maxPathPartChars = 50 const maxRequestDrainage = 100 var ( - // TODO: test with different values // requests taking longer than this are considered slow (var for tests) - slowRequestThreshold = 1500 * time.Millisecond - // % of initial threads that are marked as low latency threads(var for tests) - lowLatencyPercentile = 25 + slowRequestThreshold = 2000 * time.Millisecond + // % of autoscaled threads that are marked as low latency threads(var for tests) + lowLatencyPercentile = 20 + slowRequestPaths map[string]time.Duration latencyTrackingEnabled = false latencyTrackingActive = atomic.Bool{} slowRequestsMu = sync.RWMutex{} - slowRequestPaths map[string]time.Duration numRe = regexp.MustCompile(`^\d+$`) uuidRe = regexp.MustCompile(`^[a-f0-9-]{36}$`) ) @@ -83,6 +82,7 @@ func trackRequestLatency(fc *frankenPHPContext, duration time.Duration, forceTra if recordedLatency == 0 && latencyTrackingActive.Load() { // a new path that is known to be slow is recorded, // drain some requests to free up low-latency threads + // TODO: make sure this overhead is acceptable out: for i := 0; i < maxRequestDrainage; i++ { select { @@ -96,17 +96,18 @@ func trackRequestLatency(fc *frankenPHPContext, duration time.Duration, forceTra } } - // record the latency as a moving average - slowRequestPaths[normalizedPath] = duration/2 + recordedLatency/2 + movingAverage := duration/2 + recordedLatency/2 + slowRequestPaths[normalizedPath] = movingAverage // remove the path if it is no longer considered slow - if forceTracking && slowRequestPaths[normalizedPath] < slowRequestThreshold { + if forceTracking && movingAverage < slowRequestThreshold { delete(slowRequestPaths, normalizedPath) } + slowRequestsMu.Unlock() } -// determine if a request is likely to be high latency based on the request path +// determine if a request is likely to be high latency based on previous requests with the same path func isHighLatencyRequest(fc *frankenPHPContext) bool { if len(slowRequestPaths) == 0 { return false @@ -123,6 +124,11 @@ func isHighLatencyRequest(fc *frankenPHPContext) bool { return !fc.isLowLatencyRequest } +// normalize a path by replacing variable parts with wildcards +// e.g. /user/123/profile -> /user/:id/profile +// +// /post/550e8400-e29b-41d4-a716-446655440000 -> /post/:uuid +// /category/very-long-category-name -> /category/:slug func normalizePath(path string) string { pathLen := len(path) if pathLen > 1 && path[pathLen-1] == '/' {