Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions core/application/startup.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,29 @@ func loadRuntimeSettingsFromFile(options *config.ApplicationConfig) {
options.MemoryReclaimerThreshold = *settings.MemoryReclaimerThreshold
}
}
if settings.ForceEvictionWhenBusy != nil {
// Only apply if current value is default (false), suggesting it wasn't set from env var
if !options.ForceEvictionWhenBusy {
options.ForceEvictionWhenBusy = *settings.ForceEvictionWhenBusy
}
}
if settings.LRUEvictionMaxRetries != nil {
// Only apply if current value is default (30), suggesting it wasn't set from env var
if options.LRUEvictionMaxRetries == 0 {
options.LRUEvictionMaxRetries = *settings.LRUEvictionMaxRetries
}
}
if settings.LRUEvictionRetryInterval != nil {
// Only apply if current value is default (1s), suggesting it wasn't set from env var
if options.LRUEvictionRetryInterval == 0 {
dur, err := time.ParseDuration(*settings.LRUEvictionRetryInterval)
if err == nil {
options.LRUEvictionRetryInterval = dur
} else {
xlog.Warn("invalid LRU eviction retry interval in runtime_settings.json", "error", err, "interval", *settings.LRUEvictionRetryInterval)
}
}
}
if settings.AgentJobRetentionDays != nil {
// Only apply if current value is default (0), suggesting it wasn't set from env var
if options.AgentJobRetentionDays == 0 {
Expand Down
57 changes: 42 additions & 15 deletions core/application/watchdog.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package application

import (
"time"

"github.com/mudler/LocalAI/pkg/model"
"github.com/mudler/xlog"
)
Expand Down Expand Up @@ -37,27 +35,35 @@ func (a *Application) startWatchdog() error {
model.WithMemoryReclaimer(appConfig.MemoryReclaimerEnabled, appConfig.MemoryReclaimerThreshold),
model.WithForceEvictionWhenBusy(appConfig.ForceEvictionWhenBusy),
)
a.modelLoader.SetWatchDog(wd)

// Create new stop channel
// Create new stop channel BEFORE setting up any goroutines
// This prevents race conditions where the old shutdown handler might
// receive the closed channel and try to shut down the new watchdog
a.watchdogStop = make(chan bool, 1)

// Set the watchdog on the model loader
a.modelLoader.SetWatchDog(wd)

// Start watchdog goroutine if any periodic checks are enabled
// LRU eviction doesn't need the Run() loop - it's triggered on model load
// But memory reclaimer needs the Run() loop for periodic checking
if appConfig.WatchDogBusy || appConfig.WatchDogIdle || appConfig.MemoryReclaimerEnabled {
go wd.Run()
}

// Setup shutdown handler
// Setup shutdown handler - this goroutine will wait on a.watchdogStop
// which is now a fresh channel, so it won't receive any stale signals
// Note: We capture wd in a local variable to ensure this handler operates
// on the correct watchdog instance (not a later one that gets assigned to wd)
wdForShutdown := wd
go func() {
select {
case <-a.watchdogStop:
xlog.Debug("Watchdog stop signal received")
wd.Shutdown()
wdForShutdown.Shutdown()
case <-appConfig.Context.Done():
xlog.Debug("Context canceled, shutting down watchdog")
wd.Shutdown()
wdForShutdown.Shutdown()
}
}()

Expand All @@ -82,20 +88,41 @@ func (a *Application) RestartWatchdog() error {
a.watchdogMutex.Lock()
defer a.watchdogMutex.Unlock()

// Shutdown existing watchdog if running
// Get the old watchdog before we shut it down
oldWD := a.modelLoader.GetWatchDog()

// Get the state from the old watchdog before shutting it down
// This preserves information about loaded models
var oldState model.WatchDogState
if oldWD != nil {
oldState = oldWD.GetState()
}

// Signal all handlers to stop by closing the stop channel
// This will cause any goroutine waiting on <-a.watchdogStop to unblock
if a.watchdogStop != nil {
close(a.watchdogStop)
a.watchdogStop = nil
}

// Shutdown existing watchdog if running
currentWD := a.modelLoader.GetWatchDog()
if currentWD != nil {
currentWD.Shutdown()
// Wait a bit for shutdown to complete
time.Sleep(100 * time.Millisecond)
// Shutdown existing watchdog - this triggers the stop signal
if oldWD != nil {
oldWD.Shutdown()
// Wait for the old watchdog's Run() goroutine to fully shut down
oldWD.WaitDone()
}

// Start watchdog with new settings
return a.startWatchdog()
if err := a.startWatchdog(); err != nil {
return err
}

// Restore the model state from the old watchdog to the new one
// This ensures the new watchdog knows about already-loaded models
newWD := a.modelLoader.GetWatchDog()
if newWD != nil && len(oldState.AddressModelMap) > 0 {
newWD.RestoreState(oldState)
}

return nil
}
81 changes: 79 additions & 2 deletions pkg/model/watchdog.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type WatchDog struct {
addressModelMap map[string]string
pm ProcessManager
stop chan bool
done chan bool // Signals when Run() has completely shut down

busyCheck, idleCheck bool
lruLimit int // Maximum number of active backends (0 = unlimited)
Expand Down Expand Up @@ -78,6 +79,7 @@ func NewWatchDog(opts ...WatchDogOption) *WatchDog {
lruLimit: o.lruLimit,
addressModelMap: make(map[string]string),
stop: make(chan bool, 1),
done: make(chan bool, 1),
memoryReclaimerEnabled: o.memoryReclaimerEnabled,
memoryReclaimerThreshold: o.memoryReclaimerThreshold,
watchdogInterval: o.watchdogInterval,
Expand Down Expand Up @@ -128,6 +130,12 @@ func (wd *WatchDog) Shutdown() {
wd.stop <- true
}

// WaitDone blocks until the watchdog's Run() goroutine has completely shut down.
// This should be called after Shutdown() to ensure the watchdog is fully stopped.
func (wd *WatchDog) WaitDone() {
<-wd.done
}

func (wd *WatchDog) AddAddressModelMap(address string, model string) {
wd.Lock()
defer wd.Unlock()
Expand Down Expand Up @@ -173,6 +181,71 @@ func (wd *WatchDog) GetLoadedModelCount() int {
return len(wd.addressModelMap)
}

// WatchDogState holds the current state of models tracked by the watchdog
type WatchDogState struct {
AddressModelMap map[string]string
BusyTime map[string]time.Time
IdleTime map[string]time.Time
LastUsed map[string]time.Time
AddressMap map[string]*process.Process
}

// GetState returns the current state of models tracked by the watchdog
// This can be used to restore state when creating a new watchdog
func (wd *WatchDog) GetState() WatchDogState {
wd.Lock()
defer wd.Unlock()

// Create copies to avoid race conditions
addressModelMap := make(map[string]string, len(wd.addressModelMap))
for k, v := range wd.addressModelMap {
addressModelMap[k] = v
}

busyTime := make(map[string]time.Time, len(wd.busyTime))
for k, v := range wd.busyTime {
busyTime[k] = v
}

idleTime := make(map[string]time.Time, len(wd.idleTime))
for k, v := range wd.idleTime {
idleTime[k] = v
}

lastUsed := make(map[string]time.Time, len(wd.lastUsed))
for k, v := range wd.lastUsed {
lastUsed[k] = v
}

addressMap := make(map[string]*process.Process, len(wd.addressMap))
for k, v := range wd.addressMap {
addressMap[k] = v
}

return WatchDogState{
AddressModelMap: addressModelMap,
BusyTime: busyTime,
IdleTime: idleTime,
LastUsed: lastUsed,
AddressMap: addressMap,
}
}

// RestoreState restores the model state from a previous watchdog
// This should be called after the new watchdog is created but before Run() is started
func (wd *WatchDog) RestoreState(state WatchDogState) {
wd.Lock()
defer wd.Unlock()

wd.addressModelMap = state.AddressModelMap
wd.busyTime = state.BusyTime
wd.idleTime = state.IdleTime
wd.lastUsed = state.LastUsed
wd.addressMap = state.AddressMap

xlog.Info("[WatchDog] Restored model state", "modelCount", len(wd.addressModelMap))
}

// modelUsageInfo holds information about a model's usage for LRU sorting
type modelUsageInfo struct {
address string
Expand Down Expand Up @@ -279,6 +352,7 @@ func (wd *WatchDog) Run() {
select {
case <-wd.stop:
xlog.Info("[WatchDog] Stopping watchdog")
wd.done <- true
return
case <-time.After(wd.watchdogInterval):
// Check if any monitoring is enabled
Expand All @@ -290,6 +364,7 @@ func (wd *WatchDog) Run() {

if !busyCheck && !idleCheck && !memoryCheck {
xlog.Info("[WatchDog] No checks enabled, stopping watchdog")
wd.done <- true
return
}
if busyCheck {
Expand Down Expand Up @@ -462,14 +537,16 @@ func (wd *WatchDog) evictLRUModel() {

xlog.Info("[WatchDog] Memory reclaimer evicting LRU model", "model", lruModel.model, "lastUsed", lruModel.lastUsed)

// Untrack the model
wd.untrack(lruModel.address)
wd.Unlock()

// Shutdown the model
if err := wd.pm.ShutdownModel(lruModel.model); err != nil {
xlog.Error("[WatchDog] error shutting down model during memory reclamation", "error", err, "model", lruModel.model)
} else {
// Untrack the model
wd.Lock()
wd.untrack(lruModel.address)
wd.Unlock()
xlog.Info("[WatchDog] Memory reclaimer eviction complete", "model", lruModel.model)
}
}
Expand Down
Loading