diff --git a/caddy/app.go b/caddy/app.go index 01831c533..a0c785619 100644 --- a/caddy/app.go +++ b/caddy/app.go @@ -37,6 +37,8 @@ type FrankenPHPApp struct { PhpIni map[string]string `json:"php_ini,omitempty"` // The maximum amount of time a request may be stalled waiting for a thread MaxWaitTime time.Duration `json:"max_wait_time,omitempty"` + // Enable latency tracking mode (experimental) + LatencyTracking bool `json:"low_latency_tracking,omitempty"` metrics frankenphp.Metrics logger *slog.Logger @@ -117,6 +119,7 @@ func (f *FrankenPHPApp) Start() error { frankenphp.WithMetrics(f.metrics), frankenphp.WithPhpIni(f.PhpIni), frankenphp.WithMaxWaitTime(f.MaxWaitTime), + frankenphp.WithLatencyTracking(f.LatencyTracking), } for _, w := range append(f.Workers) { workerOpts := []frankenphp.WorkerOption{ @@ -253,6 +256,9 @@ func (f *FrankenPHPApp) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { } f.Workers = append(f.Workers, wc) + + case "latency_tracking": + f.LatencyTracking = true default: allowedDirectives := "num_threads, max_threads, php_ini, worker, max_wait_time" return wrongSubDirectiveError("frankenphp", allowedDirectives, d.Val()) diff --git a/cgi.go b/cgi.go index f9aae8690..99bf8f4bc 100644 --- a/cgi.go +++ b/cgi.go @@ -131,13 +131,6 @@ func addKnownVariablesToServer(thread *phpThread, fc *frankenPHPContext, trackVa serverPort := reqPort contentLength := request.Header.Get("Content-Length") - var requestURI string - if fc.originalRequest != nil { - requestURI = fc.originalRequest.URL.RequestURI() - } else { - requestURI = request.URL.RequestURI() - } - C.frankenphp_register_bulk( trackVarsArray, packCgiVariable(keys["REMOTE_ADDR"], ip), @@ -167,7 +160,7 @@ func addKnownVariablesToServer(thread *phpThread, fc *frankenPHPContext, trackVa packCgiVariable(keys["AUTH_TYPE"], ""), packCgiVariable(keys["REMOTE_IDENT"], ""), // Request uri of the original request - packCgiVariable(keys["REQUEST_URI"], requestURI), + packCgiVariable(keys["REQUEST_URI"], fc.getOriginalRequest().URL.RequestURI()), packCgiVariable(keys["SSL_CIPHER"], sslCipher), ) diff --git a/context.go b/context.go index 65aee5b75..f6af45564 100644 --- a/context.go +++ b/context.go @@ -26,7 +26,8 @@ type frankenPHPContext struct { scriptFilename string // Whether the request is already closed by us - isDone bool + isDone bool + isLowLatencyRequest bool responseWriter http.ResponseWriter @@ -138,6 +139,14 @@ func (fc *frankenPHPContext) clientHasClosed() bool { } } +func (fc *frankenPHPContext) getOriginalRequest() *http.Request { + if fc.originalRequest != nil { + return fc.originalRequest + } + + return fc.request +} + // reject sends a response with the given status code and message func (fc *frankenPHPContext) reject(statusCode int, message string) { if fc.isDone { diff --git a/debugstate.go b/debugstate.go index a7941ac79..f9c83ca57 100644 --- a/debugstate.go +++ b/debugstate.go @@ -6,8 +6,8 @@ type ThreadDebugState struct { Name string State string IsWaiting bool - IsBusy bool WaitingSinceMilliseconds int64 + IsLowLatency bool } // EXPERIMENTAL: FrankenPHPDebugState prints the state of all PHP threads - debugging purposes only @@ -40,7 +40,7 @@ func threadDebugState(thread *phpThread) ThreadDebugState { Name: thread.name(), State: thread.state.name(), IsWaiting: thread.state.isInWaitingState(), - IsBusy: !thread.state.isInWaitingState(), WaitingSinceMilliseconds: thread.state.waitTime(), + IsLowLatency: thread.isLowLatencyThread, } } diff --git a/frankenphp.go b/frankenphp.go index 58cd95b89..2d953f417 100644 --- a/frankenphp.go +++ b/frankenphp.go @@ -268,16 +268,14 @@ func Init(options ...Option) error { return err } - regularRequestChan = make(chan *frankenPHPContext, totalThreadCount-workerThreadCount) - regularThreads = make([]*phpThread, 0, totalThreadCount-workerThreadCount) - for i := 0; i < totalThreadCount-workerThreadCount; i++ { - convertToRegularThread(getInactivePHPThread()) - } + initRegularPHPThreads(totalThreadCount - workerThreadCount) if err := initWorkers(opt.workers); err != nil { return err } + initLatencyTracking(opt.latencyTracking) + initAutoScaling(mainThread) ctx := context.Background() @@ -603,11 +601,3 @@ func freeArgs(argv []*C.char) { C.free(unsafe.Pointer(arg)) } } - -func timeoutChan(timeout time.Duration) <-chan time.Time { - if timeout == 0 { - return nil - } - - return time.After(timeout) -} diff --git a/latencytracking.go b/latencytracking.go new file mode 100644 index 000000000..5d55f7f6a --- /dev/null +++ b/latencytracking.go @@ -0,0 +1,172 @@ +package frankenphp + +import ( + "regexp" + "strings" + "sync" + "sync/atomic" + "time" +) + +// hard limit of tracked paths +const maxTrackedPaths = 1000 + +// path parts longer than this are considered a wildcard +const maxPathPartChars = 50 + +// max amount of requests being drained when a new slow path is recorded +const maxRequestDrainage = 100 + +var ( + // requests taking longer than this are considered slow (var for tests) + slowRequestThreshold = 2000 * time.Millisecond + // % of autoscaled threads that are marked as low latency threads(var for tests) + lowLatencyPercentile = 20 + + slowRequestPaths map[string]time.Duration + latencyTrackingEnabled = false + latencyTrackingActive = atomic.Bool{} + slowRequestsMu = sync.RWMutex{} + numRe = regexp.MustCompile(`^\d+$`) + uuidRe = regexp.MustCompile(`^[a-f0-9-]{36}$`) +) + +func initLatencyTracking(enabled bool) { + latencyTrackingActive.Store(false) + slowRequestPaths = make(map[string]time.Duration) + latencyTrackingEnabled = enabled +} + +func triggerLatencyTracking(thread *phpThread, threadAmount int, threadLimit int) { + if !latencyTrackingEnabled || !isCloseToThreadLimit(threadAmount, threadLimit) { + return + } + + thread.isLowLatencyThread = true + + if !latencyTrackingActive.Load() { + latencyTrackingActive.Store(true) + logger.Info("latency tracking enabled") + } +} + +func stopLatencyTracking(threadAmount int, threadLimit int) { + if latencyTrackingActive.Load() && !isCloseToThreadLimit(threadAmount, threadLimit) { + latencyTrackingActive.Store(false) + logger.Info("latency tracking disabled") + } +} + +func isCloseToThreadLimit(threadAmount int, threadLimit int) bool { + return threadAmount >= threadLimit*(100-lowLatencyPercentile)/100 +} + +// record a slow request path +func trackRequestLatency(fc *frankenPHPContext, duration time.Duration, forceTracking bool) { + if duration < slowRequestThreshold && !forceTracking { + return + } + + request := fc.getOriginalRequest() + normalizedPath := normalizePath(request.URL.Path) + + logger.Debug("slow request detected", "path", normalizedPath, "duration", duration) + + slowRequestsMu.Lock() + // if too many slow paths are tracked, clear the map + if len(slowRequestPaths) > maxTrackedPaths { + slowRequestPaths = make(map[string]time.Duration) + } + + recordedLatency := slowRequestPaths[normalizedPath] + if recordedLatency == 0 && latencyTrackingActive.Load() { + // a new path that is known to be slow is recorded, + // drain some requests to free up low-latency threads + // TODO: make sure this overhead is acceptable + out: + for i := 0; i < maxRequestDrainage; i++ { + select { + case scaleChan <- fc: + _ = isHighLatencyRequest(fc) + default: + // no more queued requests + //break outer loop + break out + } + } + } + + movingAverage := duration/2 + recordedLatency/2 + slowRequestPaths[normalizedPath] = movingAverage + + // remove the path if it is no longer considered slow + if forceTracking && movingAverage < slowRequestThreshold { + delete(slowRequestPaths, normalizedPath) + } + + slowRequestsMu.Unlock() +} + +// determine if a request is likely to be high latency based on previous requests with the same path +func isHighLatencyRequest(fc *frankenPHPContext) bool { + if len(slowRequestPaths) == 0 { + return false + } + + normalizedPath := normalizePath(fc.getOriginalRequest().URL.Path) + + slowRequestsMu.RLock() + latency := slowRequestPaths[normalizedPath] + slowRequestsMu.RUnlock() + + fc.isLowLatencyRequest = latency < slowRequestThreshold + + return !fc.isLowLatencyRequest +} + +// normalize a path by replacing variable parts with wildcards +// e.g. /user/123/profile -> /user/:id/profile +// +// /post/550e8400-e29b-41d4-a716-446655440000 -> /post/:uuid +// /category/very-long-category-name -> /category/:slug +func normalizePath(path string) string { + pathLen := len(path) + if pathLen > 1 && path[pathLen-1] == '/' { + pathLen-- // ignore trailing slash for processing + } + + var b strings.Builder + b.Grow(len(path)) // pre-allocate at least original size + start := 0 + for i := 0; i <= pathLen; i++ { + if i == pathLen || path[i] == '/' { + if i > start { + seg := path[start:i] + b.WriteString(normalizePathPart(seg)) + } + if i < pathLen { + b.WriteByte('/') + } + start = i + 1 + } + } + return b.String() +} + +// determine if a path part is a wildcard +func normalizePathPart(part string) string { + if len(part) > maxPathPartChars { + // TODO: better slug detection? + return ":slug" + } + + if numRe.MatchString(part) { + return ":id" + } + + if uuidRe.MatchString(part) { + return ":uuid" + } + + return part +} diff --git a/latencytracking_test.go b/latencytracking_test.go new file mode 100644 index 000000000..247880dd8 --- /dev/null +++ b/latencytracking_test.go @@ -0,0 +1,120 @@ +package frankenphp + +import ( + "log/slog" + "net/http/httptest" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.uber.org/zap/exp/zapslog" + "go.uber.org/zap/zaptest" +) + +func TestNormalizePath(t *testing.T) { + tests := []struct { + input string + expected string + }{ + {"123", ":id"}, + {"/*123/456/asd", "/*123/:id/asd"}, + {"/users/", "/users"}, + {"/product/550e8400-e29b-41d4-a716-446655440000/", "/product/:uuid"}, + {"/not/a/uuid/550e8400-e29b-41d4-a716-44665544000Z/", "/not/a/uuid/550e8400-e29b-41d4-a716-44665544000Z"}, + {"/page/asdfghjk-lkjhgfdsasdfghjkjhgf-dsasdfghjkjhgfdsasdf-ghjkjhgfdsasdfghjkjhgfdsasdfghjkjhgf", "/page/:slug"}, + } + + for _, test := range tests { + normalizedPath := normalizePath(test.input) + assert.Equal(t, test.expected, normalizedPath, "normalizePath(%q) = %q; want %q", test.input, normalizedPath, test.expected) + } +} + +func assertGetRequest(t *testing.T, url string, expectedBodyContains string, opts ...RequestOption) { + t.Helper() + r := httptest.NewRequest("GET", url, nil) + w := httptest.NewRecorder() + req, err := NewRequestWithContext(r, opts...) + assert.NoError(t, err) + assert.NoError(t, ServeHTTP(w, req)) + assert.Contains(t, w.Body.String(), expectedBodyContains) +} + +func TestTunnelLowLatencyRequest_worker(t *testing.T) { + assert.NoError(t, Init( + WithWorkers("worker", "testdata/sleep.php", 1), + WithNumThreads(2), // one 'high latency worker thread' (and one unused regular thread) + WithMaxThreads(3), // one 'low latency worker thread' + WithLogger(slog.New(zapslog.NewHandler(zaptest.NewLogger(t).Core()))), + )) + defer Shutdown() + opt := WithWorkerName("worker") + + // record request path as slow, manipulate thresholds to make it easy to trigger + slowRequestThreshold = 1 * time.Millisecond + latencyTrackingEnabled = true + lowLatencyPercentile = 100 + scaleWorkerThread(getWorkerByName("worker")) // the scaled thread should be low-latency only + assertGetRequest(t, "/slow/123/path?sleep=5", "slept for 5 ms", opt) + assertGetRequest(t, "/slow/123/path?sleep=5", "slept for 5 ms", opt) + + // send 2 blocking requests that occupy all threads + wg := sync.WaitGroup{} + wg.Add(2) + for i := 0; i < 2; i++ { + go func() { + assertGetRequest(t, "/slow/123/path?sleep=500", "slept for 500 ms", opt) + wg.Done() + }() + } + time.Sleep(time.Millisecond * 100) // enough time to receive the requests + + // send a low latency request, it should not be blocked by the slow requests + start := time.Now() + assertGetRequest(t, "/fast/123/path?sleep=0", "slept for 0 ms", opt) + duration := time.Since(start) + + assert.Less(t, duration.Milliseconds(), int64(100), "the low latency request should not be blocked by the slow requests") + + // wait to avoid race conditions across tests + wg.Wait() +} + +func TestTunnelLowLatencyRequest_module(t *testing.T) { + assert.NoError(t, Init( + WithNumThreads(1), // one 'high latency thread' + WithMaxThreads(2), // one 'low latency thread' + WithLogger(slog.New(zapslog.NewHandler(zaptest.NewLogger(t).Core()))), + )) + defer Shutdown() + + // record request path as slow, manipulate thresholds to make it easy to trigger + slowRequestThreshold = 1 * time.Millisecond + latencyTrackingEnabled = true + lowLatencyPercentile = 100 + scaleRegularThread() // the scaled thread should be low-latency only + assertGetRequest(t, "/testdata/sleep.php?sleep=5", "slept for 5 ms") + assertGetRequest(t, "/testdata/sleep.php?sleep=5", "slept for 5 ms") + + // send 2 blocking requests that occupy all threads + wg := sync.WaitGroup{} + wg.Add(2) + for i := 0; i < 2; i++ { + go func() { + assertGetRequest(t, "/testdata/sleep.php?sleep=500", "slept for 500 ms") + wg.Done() + }() + } + time.Sleep(time.Millisecond * 100) // enough time to receive the requests + + // send a low latency request, it should not be blocked by the slow requests + start := time.Now() + assertGetRequest(t, "/testdata/sleep.php/fastpath/?sleep=0", "slept for 0 ms") + duration := time.Since(start) + + assert.Less(t, duration.Milliseconds(), int64(100), "the low latency request should not be blocked by the slow requests") + + // wait to avoid race conditions across tests + wg.Wait() +} diff --git a/options.go b/options.go index 18c5ba20f..50440744d 100644 --- a/options.go +++ b/options.go @@ -19,13 +19,14 @@ type WorkerOption func(*workerOpt) error // // If you change this, also update the Caddy module and the documentation. type opt struct { - numThreads int - maxThreads int - workers []workerOpt - logger *slog.Logger - metrics Metrics - phpIni map[string]string - maxWaitTime time.Duration + numThreads int + maxThreads int + workers []workerOpt + logger *slog.Logger + metrics Metrics + phpIni map[string]string + maxWaitTime time.Duration + latencyTracking bool } type workerOpt struct { @@ -54,6 +55,16 @@ func WithMaxThreads(maxThreads int) Option { } } +// TODO: is this possible without a new configuration? +// EXPERIMENTAL: WithLowLatencyTracking enables low-latency tracking mode +func WithLatencyTracking(enabled bool) Option { + return func(o *opt) error { + o.latencyTracking = enabled + + return nil + } +} + func WithMetrics(m Metrics) Option { return func(o *opt) error { o.metrics = m diff --git a/phpmainthread_test.go b/phpmainthread_test.go index d5e801422..fd69dbc2f 100644 --- a/phpmainthread_test.go +++ b/phpmainthread_test.go @@ -45,12 +45,12 @@ func TestTransitionRegularThreadToWorkerThread(t *testing.T) { worker := getDummyWorker("transition-worker-1.php") convertToWorkerThread(phpThreads[0], worker) assert.IsType(t, &workerThread{}, phpThreads[0].handler) - assert.Len(t, worker.threads, 1) + assert.Len(t, worker.threadPool.threads, 1) // transition back to inactive thread convertToInactiveThread(phpThreads[0]) assert.IsType(t, &inactiveThread{}, phpThreads[0].handler) - assert.Len(t, worker.threads, 0) + assert.Len(t, worker.threadPool.threads, 0) drainPHPThreads() assert.Nil(t, phpThreads) @@ -68,15 +68,15 @@ func TestTransitionAThreadBetween2DifferentWorkers(t *testing.T) { convertToWorkerThread(phpThreads[0], firstWorker) firstHandler := phpThreads[0].handler.(*workerThread) assert.Same(t, firstWorker, firstHandler.worker) - assert.Len(t, firstWorker.threads, 1) - assert.Len(t, secondWorker.threads, 0) + assert.Len(t, firstWorker.threadPool.threads, 1) + assert.Len(t, secondWorker.threadPool.threads, 0) // convert to second worker thread convertToWorkerThread(phpThreads[0], secondWorker) secondHandler := phpThreads[0].handler.(*workerThread) assert.Same(t, secondWorker, secondHandler.worker) - assert.Len(t, firstWorker.threads, 0) - assert.Len(t, secondWorker.threads, 1) + assert.Len(t, firstWorker.threadPool.threads, 0) + assert.Len(t, secondWorker.threadPool.threads, 1) drainPHPThreads() assert.Nil(t, phpThreads) diff --git a/phpthread.go b/phpthread.go index a60aa8f0a..0c7140b05 100644 --- a/phpthread.go +++ b/phpthread.go @@ -15,13 +15,14 @@ import ( // identified by the index in the phpThreads slice type phpThread struct { runtime.Pinner - threadIndex int - requestChan chan *frankenPHPContext - drainChan chan struct{} - handlerMu sync.Mutex - handler threadHandler - state *threadState - sandboxedEnv map[string]*C.zend_string + threadIndex int + requestChan chan *frankenPHPContext + drainChan chan struct{} + handlerMu sync.Mutex + handler threadHandler + state *threadState + sandboxedEnv map[string]*C.zend_string + isLowLatencyThread bool } // interface that defines how the callbacks from the C thread should be handled diff --git a/scaling.go b/scaling.go index 57e6c598b..6603d7402 100644 --- a/scaling.go +++ b/scaling.go @@ -63,6 +63,7 @@ func addRegularThread() (*phpThread, error) { if thread == nil { return nil, ErrMaxThreadsReached } + triggerLatencyTracking(thread, len(autoScaledThreads), cap(autoScaledThreads)) convertToRegularThread(thread) thread.state.waitFor(stateReady, stateShuttingDown, stateReserved) return thread, nil @@ -73,6 +74,7 @@ func addWorkerThread(worker *worker) (*phpThread, error) { if thread == nil { return nil, ErrMaxThreadsReached } + triggerLatencyTracking(thread, len(autoScaledThreads), cap(autoScaledThreads)) convertToWorkerThread(thread, worker) thread.state.waitFor(stateReady, stateShuttingDown, stateReserved) return thread, nil @@ -199,6 +201,11 @@ func deactivateThreads() { continue } + if thread.isLowLatencyThread && len(autoScaledThreads) >= cap(autoScaledThreads) { + // do not downscale low-latency threads if we're at the thread limit + continue + } + // convert threads to inactive if they have been idle for too long if thread.state.is(stateReady) && waitTime > maxThreadIdleTime.Milliseconds() { convertToInactiveThread(thread) @@ -220,4 +227,6 @@ func deactivateThreads() { // continue // } } + + stopLatencyTracking(len(autoScaledThreads), cap(autoScaledThreads)) } diff --git a/testdata/performance/.gitignore b/testdata/performance/.gitignore new file mode 100644 index 000000000..2b3984cf9 --- /dev/null +++ b/testdata/performance/.gitignore @@ -0,0 +1 @@ +flamegraph.svg \ No newline at end of file diff --git a/testdata/performance/extreme-hanging.js b/testdata/performance/extreme-hanging.js new file mode 100644 index 000000000..95467a52a --- /dev/null +++ b/testdata/performance/extreme-hanging.js @@ -0,0 +1,28 @@ +import http from 'k6/http' + +/** + * It is not uncommon for external services to hang for a long time. + * Make sure the server is resilient in such cases and doesn't hang as well. + */ +export const options = { + stages: [ + { duration: '20s', target: 100 }, + { duration: '20s', target: 500 }, + { duration: '20s', target: 0 } + ], + thresholds: { + http_req_failed: ['rate<0.5'] + } +} + +/* global __VU */ +/* global __ENV */ +export default function () { + if (__VU % 50 === 0) { + // 50 % of VUs cause extreme hanging + http.get(`${__ENV.CADDY_HOSTNAME}/slow-path?sleep=60000&work=10000&output=100`) + } else { + // The other VUs do very fast requests + http.get(`${__ENV.CADDY_HOSTNAME}/sleep.php?sleep=3&work=1000`) + } +} diff --git a/testdata/performance/hanging-requests.js b/testdata/performance/hanging-requests.js index db191fdef..ebc165136 100644 --- a/testdata/performance/hanging-requests.js +++ b/testdata/performance/hanging-requests.js @@ -11,7 +11,7 @@ export const options = { { duration: '20s', target: 0 } ], thresholds: { - http_req_failed: ['rate<0.01'] + http_req_failed: ['rate<0.5'] } } @@ -19,7 +19,10 @@ export const options = { export default function () { // 2% chance for a request that hangs for 15s if (Math.random() < 0.02) { - http.get(`${__ENV.CADDY_HOSTNAME}/sleep.php?sleep=15000&work=10000&output=100`) + http.get(`${__ENV.CADDY_HOSTNAME}/slow-path?sleep=15000&work=10000&output=100`, { + timeout: 500, // do not wait and continue with the next request + throw: false, + }) return } diff --git a/testdata/performance/k6.Caddyfile b/testdata/performance/k6.Caddyfile index 866bd6619..9363ace92 100644 --- a/testdata/performance/k6.Caddyfile +++ b/testdata/performance/k6.Caddyfile @@ -2,18 +2,24 @@ frankenphp { max_threads {$MAX_THREADS} num_threads {$NUM_THREADS} - worker { - file /go/src/app/testdata/{$WORKER_FILE:sleep.php} - num {$WORKER_THREADS} + max_wait_time 30s + latency_tracking + + php_ini { + max_execution_time 30 } } } :80 { route { - root /go/src/app/testdata php { root /go/src/app/testdata + worker { + file /go/src/app/testdata/{$WORKER_FILE:sleep.php} + num {$WORKER_THREADS} + match * + } } } } diff --git a/testdata/performance/perf-test.sh b/testdata/performance/perf-test.sh index f69585c54..8aed36a73 100755 --- a/testdata/performance/perf-test.sh +++ b/testdata/performance/perf-test.sh @@ -7,14 +7,18 @@ docker build -t frankenphp-dev -f dev.Dockerfile . export "CADDY_HOSTNAME=http://host.docker.internal" select filename in ./testdata/performance/*.js; do - read -r -p "How many worker threads? " workerThreads - read -r -p "How many max threads? " maxThreads + read -r -p "How many worker threads (40)? " workerThreads + read -r -p "How many max threads (80)? " maxThreads + + workerThreads=${workerThreads:-40} + maxThreads=${maxThreads:-80} numThreads=$((workerThreads + 1)) docker run --cap-add=SYS_PTRACE --security-opt seccomp=unconfined \ -p 8125:80 \ -v "$PWD:/go/src/app" \ + --rm \ --name load-test-container \ -e "MAX_THREADS=$maxThreads" \ -e "WORKER_THREADS=$workerThreads" \ @@ -35,5 +39,4 @@ select filename in ./testdata/performance/*.js; do docker exec load-test-container curl "http://localhost:2019/frankenphp/threads" docker stop load-test-container - docker rm load-test-container done diff --git a/testdata/performance/performance-testing.md b/testdata/performance/performance-testing.md index 717941a0a..fa8f7537b 100644 --- a/testdata/performance/performance-testing.md +++ b/testdata/performance/performance-testing.md @@ -10,10 +10,3 @@ bash testdata/performance/perf-test.sh This will build the `frankenphp-dev` Docker image and run it under the name 'load-test-container' in the background. Additionally, it will run the `grafana/k6` container, and you'll be able to choose the load test you want to run. A `flamegraph.svg` will be created in the `testdata/performance` directory. - -If the load test has stopped prematurely, you might have to remove the container manually: - -```sh -docker stop load-test-container -docker rm load-test-container -``` diff --git a/testdata/performance/start-server.sh b/testdata/performance/start-server.sh index 998c148cc..c968ada68 100755 --- a/testdata/performance/start-server.sh +++ b/testdata/performance/start-server.sh @@ -2,6 +2,6 @@ # build and run FrankenPHP with the k6.Caddyfile cd /go/src/app/caddy/frankenphp && - go build --buildvcs=false && + ../../go.sh build --buildvcs=false && cd ../../testdata/performance && /go/src/app/caddy/frankenphp/frankenphp run -c k6.Caddyfile diff --git a/threadpool.go b/threadpool.go new file mode 100644 index 000000000..d31d4ee01 --- /dev/null +++ b/threadpool.go @@ -0,0 +1,96 @@ +package frankenphp + +import ( + "sync" + "time" +) + +// threadPool manages a pool of PHP threads +// used for both worker and regular threads +type threadPool struct { + threads []*phpThread + mu sync.RWMutex + ch chan *frankenPHPContext + lowLatencyChan chan *frankenPHPContext +} + +func newThreadPool(capacity int) *threadPool { + return &threadPool{ + threads: make([]*phpThread, 0, capacity), + mu: sync.RWMutex{}, + ch: make(chan *frankenPHPContext), + lowLatencyChan: make(chan *frankenPHPContext), + } +} + +func (p *threadPool) attach(thread *phpThread) { + p.mu.Lock() + p.threads = append(p.threads, thread) + p.mu.Unlock() +} + +func (p *threadPool) detach(thread *phpThread) { + p.mu.Lock() + for i := len(p.threads) - 1; i >= 0; i-- { + if thread == p.threads[i] { + p.threads = append(p.threads[:i], p.threads[i+1:]...) + break + } + } + p.mu.Unlock() +} + +// get the correct request chan for queued requests +func (p *threadPool) requestChan(thread *phpThread) chan *frankenPHPContext { + if thread.isLowLatencyThread { + return p.lowLatencyChan + } + return p.ch +} + +// dispatch to all threads in order if any is available +// dispatching in order minimizes memory usage and external connections +func (p *threadPool) dispatchRequest(fc *frankenPHPContext) bool { + p.mu.RLock() + for _, thread := range p.threads { + select { + case thread.requestChan <- fc: + p.mu.RUnlock() + return true + default: + // thread is busy, continue + } + } + p.mu.RUnlock() + + return false +} + +// dispatch request to all threads, triggering scaling or timeouts as needed +func (p *threadPool) queueRequest(fc *frankenPHPContext) bool { + var timeoutChan <-chan time.Time + if maxWaitTime > 0 { + timeoutChan = time.After(maxWaitTime) + } + + for { + var lowLatencyChan chan *frankenPHPContext + if fc.isLowLatencyRequest { + lowLatencyChan = p.lowLatencyChan + } + + select { + case p.ch <- fc: + return true + case lowLatencyChan <- fc: + // only low-latency requests can be sent to low-latency threads + return true + case scaleChan <- fc: + // the request has triggered scaling, continue to wait for a thread + case <-timeoutChan: + // the request has timed out stalling + fc.reject(504, "Gateway Timeout") + return false + } + } +} diff --git a/threadregular.go b/threadregular.go index 88cef7e79..4ac0354de 100644 --- a/threadregular.go +++ b/threadregular.go @@ -1,7 +1,7 @@ package frankenphp import ( - "sync" + "time" ) // representation of a non-worker PHP thread @@ -13,25 +13,29 @@ type regularThread struct { requestContext *frankenPHPContext } -var ( - regularThreads []*phpThread - regularThreadMu = &sync.RWMutex{} - regularRequestChan chan *frankenPHPContext -) +var regularThreadPool *threadPool + +func initRegularPHPThreads(num int) { + regularThreadPool = newThreadPool(num) + for i := 0; i < num; i++ { + thread := getInactivePHPThread() + convertToRegularThread(thread) + } +} func convertToRegularThread(thread *phpThread) { thread.setHandler(®ularThread{ thread: thread, state: thread.state, }) - attachRegularThread(thread) + regularThreadPool.attach(thread) } // beforeScriptExecution returns the name of the script or an empty string on shutdown func (handler *regularThread) beforeScriptExecution() string { switch handler.state.get() { case stateTransitionRequested: - detachRegularThread(handler.thread) + regularThreadPool.detach(handler.thread) return handler.thread.transitionToNewHandler() case stateTransitionComplete: handler.state.set(stateReady) @@ -39,7 +43,7 @@ func (handler *regularThread) beforeScriptExecution() string { case stateReady: return handler.waitForRequest() case stateShuttingDown: - detachRegularThread(handler.thread) + regularThreadPool.detach(handler.thread) // signal to stop return "" } @@ -59,17 +63,18 @@ func (handler *regularThread) name() string { } func (handler *regularThread) waitForRequest() string { - // clear any previously sandboxed env - clearSandboxedEnv(handler.thread) + thread := handler.thread + clearSandboxedEnv(thread) // clear any previously sandboxed env handler.state.markAsWaiting(true) var fc *frankenPHPContext select { - case <-handler.thread.drainChan: + case <-thread.drainChan: // go back to beforeScriptExecution return handler.beforeScriptExecution() - case fc = <-regularRequestChan: + case fc = <-thread.requestChan: + case fc = <-regularThreadPool.requestChan(thread): } handler.requestContext = fc @@ -86,49 +91,29 @@ func (handler *regularThread) afterRequest() { func handleRequestWithRegularPHPThreads(fc *frankenPHPContext) { metrics.StartRequest() - select { - case regularRequestChan <- fc: - // a thread was available to handle the request immediately + + trackLatency := latencyTrackingActive.Load() + isSlowRequest := trackLatency && isHighLatencyRequest(fc) + + // dispatch requests to all regular threads in order + if !isSlowRequest && regularThreadPool.dispatchRequest(fc) { <-fc.done metrics.StopRequest() + trackRequestLatency(fc, time.Since(fc.startedAt), false) + return - default: - // no thread was available } - // if no thread was available, mark the request as queued and fan it out to all threads metrics.QueuedRequest() - for { - select { - case regularRequestChan <- fc: - metrics.DequeuedRequest() - <-fc.done - metrics.StopRequest() - return - case scaleChan <- fc: - // the request has triggered scaling, continue to wait for a thread - case <-timeoutChan(maxWaitTime): - // the request has timed out stalling - metrics.DequeuedRequest() - fc.reject(504, "Gateway Timeout") - return - } - } -} + requestWasReceived := regularThreadPool.queueRequest(fc) + metrics.DequeuedRequest() -func attachRegularThread(thread *phpThread) { - regularThreadMu.Lock() - regularThreads = append(regularThreads, thread) - regularThreadMu.Unlock() -} - -func detachRegularThread(thread *phpThread) { - regularThreadMu.Lock() - for i, t := range regularThreads { - if t == thread { - regularThreads = append(regularThreads[:i], regularThreads[i+1:]...) - break - } + if !requestWasReceived { + return } - regularThreadMu.Unlock() + + stallTime := time.Since(fc.startedAt) + <-fc.done + metrics.StopRequest() + trackRequestLatency(fc, time.Since(fc.startedAt)-stallTime, isSlowRequest) } diff --git a/threadworker.go b/threadworker.go index b7dc82036..ff1ce5908 100644 --- a/threadworker.go +++ b/threadworker.go @@ -33,14 +33,14 @@ func convertToWorkerThread(thread *phpThread, worker *worker) { maxConsecutiveFailures: worker.maxConsecutiveFailures, }, }) - worker.attachThread(thread) + worker.threadPool.attach(thread) } // beforeScriptExecution returns the name of the script or an empty string on shutdown func (handler *workerThread) beforeScriptExecution() string { switch handler.state.get() { case stateTransitionRequested: - handler.worker.detachThread(handler.thread) + handler.worker.threadPool.detach(handler.thread) return handler.thread.transitionToNewHandler() case stateRestarting: handler.state.set(stateYielding) @@ -50,7 +50,7 @@ func (handler *workerThread) beforeScriptExecution() string { setupWorkerScript(handler, handler.worker) return handler.worker.fileName case stateShuttingDown: - handler.worker.detachThread(handler.thread) + handler.worker.threadPool.detach(handler.thread) // signal to stop return "" } @@ -145,7 +145,8 @@ func tearDownWorkerScript(handler *workerThread, exitStatus int) { // waitForWorkerRequest is called during frankenphp_handle_request in the php worker script. func (handler *workerThread) waitForWorkerRequest() bool { // unpin any memory left over from previous requests - handler.thread.Unpin() + thread := handler.thread + thread.Unpin() ctx := context.Background() logger.LogAttrs(ctx, slog.LevelDebug, "waiting for request", slog.String("worker", handler.worker.name), slog.Int("thread", handler.thread.threadIndex)) @@ -170,8 +171,8 @@ func (handler *workerThread) waitForWorkerRequest() bool { var fc *frankenPHPContext select { - case <-handler.thread.drainChan: - logger.LogAttrs(ctx, slog.LevelDebug, "shutting down", slog.String("worker", handler.worker.name), slog.Int("thread", handler.thread.threadIndex)) + case <-thread.drainChan: + logger.LogAttrs(ctx, slog.LevelDebug, "shutting down", slog.String("worker", handler.worker.name), slog.Int("thread", thread.threadIndex)) // flush the opcache when restarting due to watcher or admin api // note: this is done right before frankenphp_handle_request() returns 'false' @@ -180,14 +181,14 @@ func (handler *workerThread) waitForWorkerRequest() bool { } return false - case fc = <-handler.thread.requestChan: - case fc = <-handler.worker.requestChan: + case fc = <-thread.requestChan: + case fc = <-handler.worker.threadPool.requestChan(thread): } handler.workerContext = fc handler.state.markAsWaiting(false) - logger.LogAttrs(ctx, slog.LevelDebug, "request handling started", slog.String("worker", handler.worker.name), slog.Int("thread", handler.thread.threadIndex), slog.String("url", fc.request.RequestURI)) + logger.LogAttrs(ctx, slog.LevelDebug, "request handling started", slog.String("worker", handler.worker.name), slog.Int("thread", thread.threadIndex), slog.String("url", fc.request.RequestURI)) return true } diff --git a/worker.go b/worker.go index 04772fa4a..e3b10c0ad 100644 --- a/worker.go +++ b/worker.go @@ -18,9 +18,7 @@ type worker struct { fileName string num int env PreparedEnv - requestChan chan *frankenPHPContext - threads []*phpThread - threadMutex sync.RWMutex + threadPool *threadPool allowPathMatching bool maxConsecutiveFailures int } @@ -121,8 +119,7 @@ func newWorker(o workerOpt) (*worker, error) { fileName: absFileName, num: o.num, env: o.env, - requestChan: make(chan *frankenPHPContext), - threads: make([]*phpThread, 0, o.num), + threadPool: newThreadPool(o.num), allowPathMatching: allowPathMatching, maxConsecutiveFailures: o.maxConsecutiveFailures, } @@ -139,9 +136,9 @@ func drainWorkerThreads() []*phpThread { ready := sync.WaitGroup{} drainedThreads := make([]*phpThread, 0) for _, worker := range workers { - worker.threadMutex.RLock() - ready.Add(len(worker.threads)) - for _, thread := range worker.threads { + worker.threadPool.mu.RLock() + ready.Add(len(worker.threadPool.threads)) + for _, thread := range worker.threadPool.threads { if !thread.state.requestSafeStateChange(stateRestarting) { // no state change allowed == thread is shutting down // we'll proceed to restart all other threads anyways @@ -154,7 +151,7 @@ func drainWorkerThreads() []*phpThread { ready.Done() }(thread) } - worker.threadMutex.RUnlock() + worker.threadPool.mu.RUnlock() } ready.Wait() @@ -189,65 +186,32 @@ func getDirectoriesToWatch(workerOpts []workerOpt) []string { return directoriesToWatch } -func (worker *worker) attachThread(thread *phpThread) { - worker.threadMutex.Lock() - worker.threads = append(worker.threads, thread) - worker.threadMutex.Unlock() -} - -func (worker *worker) detachThread(thread *phpThread) { - worker.threadMutex.Lock() - for i, t := range worker.threads { - if t == thread { - worker.threads = append(worker.threads[:i], worker.threads[i+1:]...) - break - } - } - worker.threadMutex.Unlock() -} - -func (worker *worker) countThreads() int { - worker.threadMutex.RLock() - l := len(worker.threads) - worker.threadMutex.RUnlock() - - return l -} - func (worker *worker) handleRequest(fc *frankenPHPContext) { metrics.StartWorkerRequest(worker.name) + isSlowRequest := latencyTrackingActive.Load() && isHighLatencyRequest(fc) + // dispatch requests to all worker threads in order - worker.threadMutex.RLock() - for _, thread := range worker.threads { - select { - case thread.requestChan <- fc: - worker.threadMutex.RUnlock() - <-fc.done - metrics.StopWorkerRequest(worker.name, time.Since(fc.startedAt)) - return - default: - // thread is busy, continue - } + if !isSlowRequest && worker.threadPool.dispatchRequest(fc) { + <-fc.done + requestTime := time.Since(fc.startedAt) + metrics.StopWorkerRequest(worker.name, requestTime) + trackRequestLatency(fc, requestTime, false) + + return } - worker.threadMutex.RUnlock() - // if no thread was available, mark the request as queued and apply the scaling strategy metrics.QueuedWorkerRequest(worker.name) - for { - select { - case worker.requestChan <- fc: - metrics.DequeuedWorkerRequest(worker.name) - <-fc.done - metrics.StopWorkerRequest(worker.name, time.Since(fc.startedAt)) - return - case scaleChan <- fc: - // the request has triggered scaling, continue to wait for a thread - case <-timeoutChan(maxWaitTime): - metrics.DequeuedWorkerRequest(worker.name) - // the request has timed out stalling - fc.reject(504, "Gateway Timeout") - return - } + requestWasReceived := worker.threadPool.queueRequest(fc) + metrics.DequeuedWorkerRequest(worker.name) + + if !requestWasReceived { + return } + + stallDuration := time.Since(fc.startedAt) + <-fc.done + requestTime := time.Since(fc.startedAt) + metrics.StopWorkerRequest(worker.name, requestTime) + trackRequestLatency(fc, requestTime-stallDuration, isSlowRequest) }