Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions caddy/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type FrankenPHPApp struct {
PhpIni map[string]string `json:"php_ini,omitempty"`
// The maximum amount of time a request may be stalled waiting for a thread
MaxWaitTime time.Duration `json:"max_wait_time,omitempty"`
// Enable latency tracking mode (experimental)
LatencyTracking bool `json:"low_latency_tracking,omitempty"`

metrics frankenphp.Metrics
logger *slog.Logger
Expand Down Expand Up @@ -117,6 +119,7 @@ func (f *FrankenPHPApp) Start() error {
frankenphp.WithMetrics(f.metrics),
frankenphp.WithPhpIni(f.PhpIni),
frankenphp.WithMaxWaitTime(f.MaxWaitTime),
frankenphp.WithLatencyTracking(f.LatencyTracking),
}
for _, w := range append(f.Workers) {
workerOpts := []frankenphp.WorkerOption{
Expand Down Expand Up @@ -253,6 +256,9 @@ func (f *FrankenPHPApp) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
}

f.Workers = append(f.Workers, wc)

case "latency_tracking":
f.LatencyTracking = true
default:
allowedDirectives := "num_threads, max_threads, php_ini, worker, max_wait_time"
return wrongSubDirectiveError("frankenphp", allowedDirectives, d.Val())
Expand Down
9 changes: 1 addition & 8 deletions cgi.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,6 @@ func addKnownVariablesToServer(thread *phpThread, fc *frankenPHPContext, trackVa
serverPort := reqPort
contentLength := request.Header.Get("Content-Length")

var requestURI string
if fc.originalRequest != nil {
requestURI = fc.originalRequest.URL.RequestURI()
} else {
requestURI = request.URL.RequestURI()
}

C.frankenphp_register_bulk(
trackVarsArray,
packCgiVariable(keys["REMOTE_ADDR"], ip),
Expand Down Expand Up @@ -167,7 +160,7 @@ func addKnownVariablesToServer(thread *phpThread, fc *frankenPHPContext, trackVa
packCgiVariable(keys["AUTH_TYPE"], ""),
packCgiVariable(keys["REMOTE_IDENT"], ""),
// Request uri of the original request
packCgiVariable(keys["REQUEST_URI"], requestURI),
packCgiVariable(keys["REQUEST_URI"], fc.getOriginalRequest().URL.RequestURI()),
packCgiVariable(keys["SSL_CIPHER"], sslCipher),
)

Expand Down
11 changes: 10 additions & 1 deletion context.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ type frankenPHPContext struct {
scriptFilename string

// Whether the request is already closed by us
isDone bool
isDone bool
isLowLatencyRequest bool

responseWriter http.ResponseWriter

Expand Down Expand Up @@ -138,6 +139,14 @@ func (fc *frankenPHPContext) clientHasClosed() bool {
}
}

func (fc *frankenPHPContext) getOriginalRequest() *http.Request {
if fc.originalRequest != nil {
return fc.originalRequest
}

return fc.request
}

// reject sends a response with the given status code and message
func (fc *frankenPHPContext) reject(statusCode int, message string) {
if fc.isDone {
Expand Down
4 changes: 2 additions & 2 deletions debugstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ type ThreadDebugState struct {
Name string
State string
IsWaiting bool
IsBusy bool
WaitingSinceMilliseconds int64
IsLowLatency bool
}

// EXPERIMENTAL: FrankenPHPDebugState prints the state of all PHP threads - debugging purposes only
Expand Down Expand Up @@ -40,7 +40,7 @@ func threadDebugState(thread *phpThread) ThreadDebugState {
Name: thread.name(),
State: thread.state.name(),
IsWaiting: thread.state.isInWaitingState(),
IsBusy: !thread.state.isInWaitingState(),
WaitingSinceMilliseconds: thread.state.waitTime(),
IsLowLatency: thread.isLowLatencyThread,
}
}
16 changes: 3 additions & 13 deletions frankenphp.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,16 +268,14 @@ func Init(options ...Option) error {
return err
}

regularRequestChan = make(chan *frankenPHPContext, totalThreadCount-workerThreadCount)
regularThreads = make([]*phpThread, 0, totalThreadCount-workerThreadCount)
for i := 0; i < totalThreadCount-workerThreadCount; i++ {
convertToRegularThread(getInactivePHPThread())
}
initRegularPHPThreads(totalThreadCount - workerThreadCount)

if err := initWorkers(opt.workers); err != nil {
return err
}

initLatencyTracking(opt.latencyTracking)

initAutoScaling(mainThread)

ctx := context.Background()
Expand Down Expand Up @@ -603,11 +601,3 @@ func freeArgs(argv []*C.char) {
C.free(unsafe.Pointer(arg))
}
}

func timeoutChan(timeout time.Duration) <-chan time.Time {
if timeout == 0 {
return nil
}

return time.After(timeout)
}
172 changes: 172 additions & 0 deletions latencytracking.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package frankenphp

import (
"regexp"
"strings"
"sync"
"sync/atomic"
"time"
)

// hard limit of tracked paths
const maxTrackedPaths = 1000

// path parts longer than this are considered a wildcard
const maxPathPartChars = 50

// max amount of requests being drained when a new slow path is recorded
const maxRequestDrainage = 100

var (
// requests taking longer than this are considered slow (var for tests)
slowRequestThreshold = 2000 * time.Millisecond
// % of autoscaled threads that are marked as low latency threads(var for tests)
lowLatencyPercentile = 20

slowRequestPaths map[string]time.Duration
latencyTrackingEnabled = false
latencyTrackingActive = atomic.Bool{}
slowRequestsMu = sync.RWMutex{}
numRe = regexp.MustCompile(`^\d+$`)
uuidRe = regexp.MustCompile(`^[a-f0-9-]{36}$`)
)

func initLatencyTracking(enabled bool) {
latencyTrackingActive.Store(false)
slowRequestPaths = make(map[string]time.Duration)
latencyTrackingEnabled = enabled
}

func triggerLatencyTracking(thread *phpThread, threadAmount int, threadLimit int) {
if !latencyTrackingEnabled || !isCloseToThreadLimit(threadAmount, threadLimit) {
return
}

thread.isLowLatencyThread = true

if !latencyTrackingActive.Load() {
latencyTrackingActive.Store(true)
logger.Info("latency tracking enabled")
}
}

func stopLatencyTracking(threadAmount int, threadLimit int) {
if latencyTrackingActive.Load() && !isCloseToThreadLimit(threadAmount, threadLimit) {
latencyTrackingActive.Store(false)
logger.Info("latency tracking disabled")
}
}

func isCloseToThreadLimit(threadAmount int, threadLimit int) bool {
return threadAmount >= threadLimit*(100-lowLatencyPercentile)/100
}

// record a slow request path
func trackRequestLatency(fc *frankenPHPContext, duration time.Duration, forceTracking bool) {
if duration < slowRequestThreshold && !forceTracking {
return
}

request := fc.getOriginalRequest()
normalizedPath := normalizePath(request.URL.Path)

logger.Debug("slow request detected", "path", normalizedPath, "duration", duration)

slowRequestsMu.Lock()
// if too many slow paths are tracked, clear the map
if len(slowRequestPaths) > maxTrackedPaths {
slowRequestPaths = make(map[string]time.Duration)
}

recordedLatency := slowRequestPaths[normalizedPath]
if recordedLatency == 0 && latencyTrackingActive.Load() {
// a new path that is known to be slow is recorded,
// drain some requests to free up low-latency threads
// TODO: make sure this overhead is acceptable
out:
for i := 0; i < maxRequestDrainage; i++ {
select {
case scaleChan <- fc:
_ = isHighLatencyRequest(fc)
default:
// no more queued requests
//break outer loop
break out
}
}
}

movingAverage := duration/2 + recordedLatency/2
slowRequestPaths[normalizedPath] = movingAverage

// remove the path if it is no longer considered slow
if forceTracking && movingAverage < slowRequestThreshold {
delete(slowRequestPaths, normalizedPath)
}

slowRequestsMu.Unlock()
}

// determine if a request is likely to be high latency based on previous requests with the same path
func isHighLatencyRequest(fc *frankenPHPContext) bool {
if len(slowRequestPaths) == 0 {
return false
}

normalizedPath := normalizePath(fc.getOriginalRequest().URL.Path)

slowRequestsMu.RLock()
latency := slowRequestPaths[normalizedPath]
slowRequestsMu.RUnlock()

fc.isLowLatencyRequest = latency < slowRequestThreshold

return !fc.isLowLatencyRequest
}

// normalize a path by replacing variable parts with wildcards
// e.g. /user/123/profile -> /user/:id/profile
//
// /post/550e8400-e29b-41d4-a716-446655440000 -> /post/:uuid
// /category/very-long-category-name -> /category/:slug
func normalizePath(path string) string {
pathLen := len(path)
if pathLen > 1 && path[pathLen-1] == '/' {
pathLen-- // ignore trailing slash for processing
}

var b strings.Builder
b.Grow(len(path)) // pre-allocate at least original size
start := 0
for i := 0; i <= pathLen; i++ {
if i == pathLen || path[i] == '/' {
if i > start {
seg := path[start:i]
b.WriteString(normalizePathPart(seg))
}
if i < pathLen {
b.WriteByte('/')
}
start = i + 1
}
}
return b.String()
}

// determine if a path part is a wildcard
func normalizePathPart(part string) string {
if len(part) > maxPathPartChars {
// TODO: better slug detection?
return ":slug"
}

if numRe.MatchString(part) {
return ":id"
}

if uuidRe.MatchString(part) {
return ":uuid"
}

return part
}
Loading