From 0b3ca0fdf89efd13250a591755e0651fa62f386e Mon Sep 17 00:00:00 2001 From: Steven Miller Date: Sat, 4 Apr 2026 23:22:08 -0400 Subject: [PATCH 1/5] Add burst-proof resource reservations --- lib/instances/admission.go | 22 ++++ lib/instances/admission_test.go | 33 +++++ lib/instances/create.go | 19 ++- lib/instances/manager.go | 7 +- lib/instances/restore.go | 16 ++- lib/instances/start.go | 16 ++- lib/resources/resource.go | 217 +++++++++++++++++++++++++++----- lib/resources/resource_test.go | 75 +++++++++++ 8 files changed, 365 insertions(+), 40 deletions(-) create mode 100644 lib/instances/admission.go create mode 100644 lib/instances/admission_test.go diff --git a/lib/instances/admission.go b/lib/instances/admission.go new file mode 100644 index 00000000..cc22a099 --- /dev/null +++ b/lib/instances/admission.go @@ -0,0 +1,22 @@ +package instances + +func volumeOverlayReservationBytes(volumes []VolumeAttachment) int64 { + var total int64 + for _, vol := range volumes { + if vol.Overlay { + total += vol.OverlaySize + } + } + return total +} + +func requestedDiskReservationBytes(overlaySize int64, volumes []VolumeAttachment) int64 { + return overlaySize + volumeOverlayReservationBytes(volumes) +} + +func storedDiskReservationBytes(stored *StoredMetadata) int64 { + if stored == nil { + return 0 + } + return requestedDiskReservationBytes(stored.OverlaySize, stored.Volumes) +} diff --git a/lib/instances/admission_test.go b/lib/instances/admission_test.go new file mode 100644 index 00000000..f4607870 --- /dev/null +++ b/lib/instances/admission_test.go @@ -0,0 +1,33 @@ +package instances + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestRequestedDiskReservationBytes(t *testing.T) { + t.Parallel() + + diskBytes := requestedDiskReservationBytes(10, []VolumeAttachment{ + {VolumeID: "base-only", Overlay: false, OverlaySize: 100}, + {VolumeID: "overlay-a", Overlay: true, OverlaySize: 20}, + {VolumeID: "overlay-b", Overlay: true, OverlaySize: 30}, + }) + + assert.Equal(t, int64(60), diskBytes) +} + +func TestStoredDiskReservationBytes(t *testing.T) { + t.Parallel() + + diskBytes := storedDiskReservationBytes(&StoredMetadata{ + OverlaySize: 15, + Volumes: []VolumeAttachment{ + {VolumeID: "base-only", Overlay: false, OverlaySize: 100}, + {VolumeID: "overlay", Overlay: true, OverlaySize: 25}, + }, + }) + + assert.Equal(t, int64(40), diskBytes) +} diff --git a/lib/instances/create.go b/lib/instances/create.go index 1abd4c21..d540b718 100644 --- a/lib/instances/create.go +++ b/lib/instances/create.go @@ -195,13 +195,22 @@ func (m *manager) createInstance( return nil, fmt.Errorf("total memory %d (size + hotplug_size) exceeds maximum allowed %d per instance", totalMemory, m.limits.MaxMemoryPerInstance) } - // Validate aggregate resource limits via ResourceValidator (if configured) + diskBytes := requestedDiskReservationBytes(overlaySize, req.Volumes) + reservedResources := false + + // Reserve aggregate resources for this create while it is in flight. if m.resourceValidator != nil { needsGPU := req.GPU != nil && req.GPU.Profile != "" - if err := m.resourceValidator.ValidateAllocation(ctx, vcpus, totalMemory, req.NetworkBandwidthDownload, req.NetworkBandwidthUpload, req.DiskIOBps, needsGPU); err != nil { - log.ErrorContext(ctx, "resource validation failed", "error", err) + if err := m.resourceValidator.ReserveAllocation(ctx, id, vcpus, totalMemory, req.NetworkBandwidthDownload, req.NetworkBandwidthUpload, req.DiskIOBps, diskBytes, needsGPU); err != nil { + log.ErrorContext(ctx, "resource reservation failed", "error", err) return nil, fmt.Errorf("%w: %v", ErrInsufficientResources, err) } + reservedResources = true + defer func() { + if reservedResources { + m.resourceValidator.FinishAllocation(id) + } + }() } if req.Env == nil { @@ -502,6 +511,10 @@ func (m *manager) createInstance( return nil, err } startVMSpanEnd(nil) + if reservedResources { + m.resourceValidator.FinishAllocation(id) + reservedResources = false + } // 20. Persist runtime metadata updates after VM boot. meta = &metadata{StoredMetadata: *stored} diff --git a/lib/instances/manager.go b/lib/instances/manager.go index 4ef8d9bf..1d8dd64b 100644 --- a/lib/instances/manager.go +++ b/lib/instances/manager.go @@ -84,7 +84,12 @@ type ResourceValidator interface { // ValidateAllocation checks if the requested resources are available. // Returns nil if allocation is allowed, or a detailed error describing // which resource is insufficient and the current capacity/usage. - ValidateAllocation(ctx context.Context, vcpus int, memoryBytes int64, networkDownloadBps int64, networkUploadBps int64, diskIOBps int64, needsGPU bool) error + ValidateAllocation(ctx context.Context, vcpus int, memoryBytes int64, networkDownloadBps int64, networkUploadBps int64, diskIOBps int64, diskBytes int64, needsGPU bool) error + // ReserveAllocation tentatively reserves resources for an in-flight operation. + // Call FinishAllocation once the operation fails or becomes visible to resource accounting. + ReserveAllocation(ctx context.Context, instanceID string, vcpus int, memoryBytes int64, networkDownloadBps int64, networkUploadBps int64, diskIOBps int64, diskBytes int64, needsGPU bool) error + // FinishAllocation removes any pending reservation for the given instance ID. + FinishAllocation(instanceID string) } type manager struct { diff --git a/lib/instances/restore.go b/lib/instances/restore.go index 68bcb7f1..07d53757 100644 --- a/lib/instances/restore.go +++ b/lib/instances/restore.go @@ -59,13 +59,21 @@ func (m *manager) restoreInstance( } // 2b. Validate aggregate resource limits before allocating resources (if configured) + reservedResources := false if m.resourceValidator != nil { needsGPU := stored.GPUProfile != "" totalMemory := stored.Size + stored.HotplugSize - if err := m.resourceValidator.ValidateAllocation(ctx, stored.Vcpus, totalMemory, stored.NetworkBandwidthDownload, stored.NetworkBandwidthUpload, stored.DiskIOBps, needsGPU); err != nil { - log.ErrorContext(ctx, "resource validation failed for restore", "instance_id", id, "error", err) + diskBytes := storedDiskReservationBytes(stored) + if err := m.resourceValidator.ReserveAllocation(ctx, id, stored.Vcpus, totalMemory, stored.NetworkBandwidthDownload, stored.NetworkBandwidthUpload, stored.DiskIOBps, diskBytes, needsGPU); err != nil { + log.ErrorContext(ctx, "resource reservation failed for restore", "instance_id", id, "error", err) return nil, fmt.Errorf("%w: %v", ErrInsufficientResources, err) } + reservedResources = true + defer func() { + if reservedResources { + m.resourceValidator.FinishAllocation(id) + } + }() } // 3. Get snapshot directory @@ -253,6 +261,10 @@ func (m *manager) restoreInstance( return nil, fmt.Errorf("resume vm failed: %w", err) } resumeSpanEnd(nil) + if reservedResources { + m.resourceValidator.FinishAllocation(id) + reservedResources = false + } // Forked standby restores may allocate a fresh identity while the guest memory snapshot // still has the source VM's old IP configuration. Reconfigure guest networking after diff --git a/lib/instances/start.go b/lib/instances/start.go index bf0e385f..daac0641 100644 --- a/lib/instances/start.go +++ b/lib/instances/start.go @@ -61,13 +61,21 @@ func (m *manager) startInstance( } // 2b. Validate aggregate resource limits before allocating resources (if configured) + reservedResources := false if m.resourceValidator != nil { needsGPU := stored.GPUProfile != "" totalMemory := stored.Size + stored.HotplugSize - if err := m.resourceValidator.ValidateAllocation(ctx, stored.Vcpus, totalMemory, stored.NetworkBandwidthDownload, stored.NetworkBandwidthUpload, stored.DiskIOBps, needsGPU); err != nil { - log.ErrorContext(ctx, "resource validation failed for start", "instance_id", id, "error", err) + diskBytes := storedDiskReservationBytes(stored) + if err := m.resourceValidator.ReserveAllocation(ctx, id, stored.Vcpus, totalMemory, stored.NetworkBandwidthDownload, stored.NetworkBandwidthUpload, stored.DiskIOBps, diskBytes, needsGPU); err != nil { + log.ErrorContext(ctx, "resource reservation failed for start", "instance_id", id, "error", err) return nil, fmt.Errorf("%w: %v", ErrInsufficientResources, err) } + reservedResources = true + defer func() { + if reservedResources { + m.resourceValidator.FinishAllocation(id) + } + }() } // 3. Get image info (needed for buildHypervisorConfig) @@ -188,6 +196,10 @@ func (m *manager) startInstance( return nil, err } startVMSpanEnd(nil) + if reservedResources { + m.resourceValidator.FinishAllocation(id) + reservedResources = false + } // Success - release cleanup stack (prevent cleanup) cu.Release() diff --git a/lib/resources/resource.go b/lib/resources/resource.go index 7435421e..34c6adec 100644 --- a/lib/resources/resource.go +++ b/lib/resources/resource.go @@ -155,6 +155,15 @@ type InstanceUtilizationInfo struct { AllocatedMemoryBytes int64 // Allocated memory in bytes (Size + HotplugSize) } +type pendingAllocation struct { + CPU int64 + MemoryBytes int64 + DiskBytes int64 + NetworkBps int64 + DiskIOBps int64 + GPUSlots int +} + // Manager coordinates resource discovery and allocation tracking. type Manager struct { cfg *config.Config @@ -168,6 +177,9 @@ type Manager struct { instanceLister InstanceLister imageLister ImageLister volumeLister VolumeLister + + // Pending allocations are used only for admission decisions. + pending map[string]pendingAllocation } // NewManager creates a new resource manager. @@ -177,6 +189,7 @@ func NewManager(cfg *config.Config, p *paths.Paths) *Manager { paths: p, resources: make(map[ResourceType]Resource), monitoring: &monitoringState{}, + pending: make(map[string]pendingAllocation), } } @@ -400,88 +413,228 @@ func (m *Manager) GetFullStatus(ctx context.Context) (*FullResourceStatus, error // CanAllocate checks if the requested amount can be allocated for a resource type. func (m *Manager) CanAllocate(ctx context.Context, rt ResourceType, amount int64) (bool, error) { - status, err := m.GetStatus(ctx, rt) + m.mu.Lock() + defer m.mu.Unlock() + + status, err := m.getAdmissionStatusLocked(ctx, rt, "") if err != nil { return false, err } return amount <= status.Available, nil } -// ValidateAllocation checks if the requested resources can be allocated. -// Returns nil if allocation is allowed, or a detailed error describing -// which resource is insufficient and the current capacity/usage. -// Parameters match instances.AllocationRequest to implement instances.ResourceValidator. -func (m *Manager) ValidateAllocation(ctx context.Context, vcpus int, memoryBytes int64, networkDownloadBps int64, networkUploadBps int64, diskIOBps int64, needsGPU bool) error { +func normalizeNetworkBandwidth(downloadBps, uploadBps int64) int64 { + netBandwidth := downloadBps + if uploadBps > netBandwidth { + netBandwidth = uploadBps + } + return netBandwidth +} + +func newPendingAllocation(vcpus int, memoryBytes int64, networkDownloadBps int64, networkUploadBps int64, diskIOBps int64, diskBytes int64, needsGPU bool) pendingAllocation { + req := pendingAllocation{ + CPU: int64(vcpus), + MemoryBytes: memoryBytes, + DiskBytes: diskBytes, + NetworkBps: normalizeNetworkBandwidth(networkDownloadBps, networkUploadBps), + DiskIOBps: diskIOBps, + } + if needsGPU { + req.GPUSlots = 1 + } + return req +} + +func (m *Manager) pendingAmountLocked(rt ResourceType, excludeID string) int64 { + var total int64 + for id, pending := range m.pending { + if id == excludeID { + continue + } + switch rt { + case ResourceCPU: + total += pending.CPU + case ResourceMemory: + total += pending.MemoryBytes + case ResourceDisk: + total += pending.DiskBytes + case ResourceNetwork: + total += pending.NetworkBps + case ResourceDiskIO: + total += pending.DiskIOBps + } + } + return total +} + +func (m *Manager) pendingGPUSlotsLocked(excludeID string) int { + var total int + for id, pending := range m.pending { + if id == excludeID { + continue + } + total += pending.GPUSlots + } + return total +} + +func (m *Manager) getAdmissionStatusLocked(ctx context.Context, rt ResourceType, excludeID string) (*ResourceStatus, error) { + res, ok := m.resources[rt] + if !ok { + return nil, fmt.Errorf("unknown resource type: %s", rt) + } + + capacity := res.Capacity() + ratio := m.GetOversubRatio(rt) + effectiveLimit := int64(float64(capacity) * ratio) + + allocated, err := res.Allocated(ctx) + if err != nil { + return nil, fmt.Errorf("get allocated %s: %w", rt, err) + } + allocated += m.pendingAmountLocked(rt, excludeID) + + available := effectiveLimit - allocated + if available < 0 { + available = 0 + } + + status := &ResourceStatus{ + Type: rt, + Capacity: capacity, + EffectiveLimit: effectiveLimit, + Allocated: allocated, + Available: available, + OversubRatio: ratio, + } + if rt == ResourceNetwork { + if m.cfg.Capacity.Network != "" { + status.Source = SourceConfigured + } else { + status.Source = SourceDetected + } + } + return status, nil +} + +func (m *Manager) validateAllocationLocked(ctx context.Context, excludeID string, req pendingAllocation) error { // Check CPU - if vcpus > 0 { - status, err := m.GetStatus(ctx, ResourceCPU) + if req.CPU > 0 { + status, err := m.getAdmissionStatusLocked(ctx, ResourceCPU, excludeID) if err != nil { return fmt.Errorf("check CPU capacity: %w", err) } - if int64(vcpus) > status.Available { + if req.CPU > status.Available { return fmt.Errorf("insufficient CPU: requested %d vCPUs, but only %d available (currently allocated: %d, effective limit: %d with %.1fx oversubscription)", - vcpus, status.Available, status.Allocated, status.EffectiveLimit, status.OversubRatio) + req.CPU, status.Available, status.Allocated, status.EffectiveLimit, status.OversubRatio) } } // Check Memory - if memoryBytes > 0 { - status, err := m.GetStatus(ctx, ResourceMemory) + if req.MemoryBytes > 0 { + status, err := m.getAdmissionStatusLocked(ctx, ResourceMemory, excludeID) if err != nil { return fmt.Errorf("check memory capacity: %w", err) } - if memoryBytes > status.Available { + if req.MemoryBytes > status.Available { return fmt.Errorf("insufficient memory: requested %s, but only %s available (currently allocated: %s, effective limit: %s with %.1fx oversubscription)", - datasize.ByteSize(memoryBytes).HR(), datasize.ByteSize(status.Available).HR(), datasize.ByteSize(status.Allocated).HR(), datasize.ByteSize(status.EffectiveLimit).HR(), status.OversubRatio) + datasize.ByteSize(req.MemoryBytes).HR(), datasize.ByteSize(status.Available).HR(), datasize.ByteSize(status.Allocated).HR(), datasize.ByteSize(status.EffectiveLimit).HR(), status.OversubRatio) } } - // Check Network (use max of download/upload since they share physical link) - netBandwidth := networkDownloadBps - if networkUploadBps > netBandwidth { - netBandwidth = networkUploadBps + // Check Disk + if req.DiskBytes > 0 { + status, err := m.getAdmissionStatusLocked(ctx, ResourceDisk, excludeID) + if err != nil { + return fmt.Errorf("check disk capacity: %w", err) + } + if req.DiskBytes > status.Available { + return fmt.Errorf("insufficient disk: requested %s, but only %s available (currently allocated: %s, effective limit: %s with %.1fx oversubscription)", + datasize.ByteSize(req.DiskBytes).HR(), datasize.ByteSize(status.Available).HR(), datasize.ByteSize(status.Allocated).HR(), datasize.ByteSize(status.EffectiveLimit).HR(), status.OversubRatio) + } } - if netBandwidth > 0 { - status, err := m.GetStatus(ctx, ResourceNetwork) + + // Check Network (use max of download/upload since they share physical link) + if req.NetworkBps > 0 { + status, err := m.getAdmissionStatusLocked(ctx, ResourceNetwork, excludeID) if err != nil { return fmt.Errorf("check network capacity: %w", err) } - if netBandwidth > status.Available { + if req.NetworkBps > status.Available { return fmt.Errorf("insufficient network bandwidth: requested %s/s, but only %s/s available (currently allocated: %s/s, effective limit: %s/s with %.1fx oversubscription)", - datasize.ByteSize(netBandwidth).HR(), datasize.ByteSize(status.Available).HR(), datasize.ByteSize(status.Allocated).HR(), datasize.ByteSize(status.EffectiveLimit).HR(), status.OversubRatio) + datasize.ByteSize(req.NetworkBps).HR(), datasize.ByteSize(status.Available).HR(), datasize.ByteSize(status.Allocated).HR(), datasize.ByteSize(status.EffectiveLimit).HR(), status.OversubRatio) } } // Check Disk I/O - if diskIOBps > 0 { - status, err := m.GetStatus(ctx, ResourceDiskIO) + if req.DiskIOBps > 0 { + status, err := m.getAdmissionStatusLocked(ctx, ResourceDiskIO, excludeID) if err != nil { return fmt.Errorf("check disk I/O capacity: %w", err) } - if diskIOBps > status.Available { + if req.DiskIOBps > status.Available { return fmt.Errorf("insufficient disk I/O: requested %s/s, but only %s/s available (currently allocated: %s/s, effective limit: %s/s with %.1fx oversubscription)", - datasize.ByteSize(diskIOBps).HR(), datasize.ByteSize(status.Available).HR(), + datasize.ByteSize(req.DiskIOBps).HR(), datasize.ByteSize(status.Available).HR(), datasize.ByteSize(status.Allocated).HR(), datasize.ByteSize(status.EffectiveLimit).HR(), status.OversubRatio) } } // Check GPU if needed - if needsGPU { - gpuStatus := GetGPUStatus() + if req.GPUSlots > 0 { + gpuStatus := currentGPUStatusProvider()() if gpuStatus == nil { return fmt.Errorf("insufficient GPU: no GPU available on this host") } - availableSlots := gpuStatus.TotalSlots - gpuStatus.UsedSlots - if availableSlots <= 0 { - return fmt.Errorf("insufficient GPU: all %d %s slots are in use", - gpuStatus.TotalSlots, gpuStatus.Mode) + availableSlots := gpuStatus.TotalSlots - gpuStatus.UsedSlots - m.pendingGPUSlotsLocked(excludeID) + if availableSlots < req.GPUSlots { + if availableSlots <= 0 { + return fmt.Errorf("insufficient GPU: all %d %s slots are in use", + gpuStatus.TotalSlots, gpuStatus.Mode) + } + return fmt.Errorf("insufficient GPU: requested %d %s slot(s), but only %d available", + req.GPUSlots, gpuStatus.Mode, availableSlots) } } return nil } +// ValidateAllocation checks if the requested resources can be allocated. +// Returns nil if allocation is allowed, or a detailed error describing +// which resource is insufficient and the current capacity/usage. +func (m *Manager) ValidateAllocation(ctx context.Context, vcpus int, memoryBytes int64, networkDownloadBps int64, networkUploadBps int64, diskIOBps int64, diskBytes int64, needsGPU bool) error { + m.mu.Lock() + defer m.mu.Unlock() + + req := newPendingAllocation(vcpus, memoryBytes, networkDownloadBps, networkUploadBps, diskIOBps, diskBytes, needsGPU) + return m.validateAllocationLocked(ctx, "", req) +} + +// ReserveAllocation tentatively reserves resources for an in-flight operation. +func (m *Manager) ReserveAllocation(ctx context.Context, instanceID string, vcpus int, memoryBytes int64, networkDownloadBps int64, networkUploadBps int64, diskIOBps int64, diskBytes int64, needsGPU bool) error { + m.mu.Lock() + defer m.mu.Unlock() + + req := newPendingAllocation(vcpus, memoryBytes, networkDownloadBps, networkUploadBps, diskIOBps, diskBytes, needsGPU) + if err := m.validateAllocationLocked(ctx, instanceID, req); err != nil { + return err + } + m.pending[instanceID] = req + return nil +} + +// FinishAllocation removes any pending reservation for the given instance ID. +func (m *Manager) FinishAllocation(instanceID string) { + if instanceID == "" { + return + } + + m.mu.Lock() + defer m.mu.Unlock() + delete(m.pending, instanceID) +} + // CPUCapacity returns the raw CPU capacity (number of vCPUs). func (m *Manager) CPUCapacity() int64 { m.mu.RLock() diff --git a/lib/resources/resource_test.go b/lib/resources/resource_test.go index c6b355c1..38bf9322 100644 --- a/lib/resources/resource_test.go +++ b/lib/resources/resource_test.go @@ -457,3 +457,78 @@ func TestDiskBreakdown_IncludesOCICacheAndVolumeOverlays(t *testing.T) { // Overlays should be (10+5) + (8+2) = 25GB assert.Equal(t, int64(25*1024*1024*1024), status.DiskDetail.Overlays) } + +func TestValidateAllocation_Disk(t *testing.T) { + cfg := &config.Config{ + DataDir: t.TempDir(), + Capacity: config.CapacityConfig{ + Disk: "100GB", + }, + Oversubscription: config.OversubscriptionConfig{ + CPU: 1.0, Memory: 1.0, Disk: 1.0, Network: 1.0, DiskIO: 1.0, + }, + } + p := paths.New(cfg.DataDir) + + mgr := NewManager(cfg, p) + mgr.SetInstanceLister(&mockInstanceLister{ + allocations: []InstanceAllocation{ + { + ID: "vm-1", + OverlayBytes: 20 * 1024 * 1024 * 1024, + VolumeOverlayBytes: 5 * 1024 * 1024 * 1024, + State: "Running", + }, + }, + }) + mgr.SetImageLister(&mockImageLister{totalBytes: 10 * 1024 * 1024 * 1024}) + mgr.SetVolumeLister(&mockVolumeLister{totalBytes: 35 * 1024 * 1024 * 1024}) + + err := mgr.Initialize(context.Background()) + require.NoError(t, err) + + err = mgr.ValidateAllocation(context.Background(), 0, 0, 0, 0, 0, 30*1024*1024*1024, false) + require.NoError(t, err) + + err = mgr.ValidateAllocation(context.Background(), 0, 0, 0, 0, 0, 31*1024*1024*1024, false) + require.Error(t, err) + assert.Contains(t, err.Error(), "insufficient disk") +} + +func TestReserveAllocation_TracksPendingCapacity(t *testing.T) { + cfg := &config.Config{ + DataDir: t.TempDir(), + Capacity: config.CapacityConfig{ + Disk: "100GB", + }, + Oversubscription: config.OversubscriptionConfig{ + CPU: 1.0, Memory: 1.0, Disk: 1.0, Network: 1.0, DiskIO: 1.0, + }, + } + p := paths.New(cfg.DataDir) + + mgr := NewManager(cfg, p) + mgr.SetInstanceLister(&mockInstanceLister{}) + mgr.SetImageLister(&mockImageLister{}) + mgr.SetVolumeLister(&mockVolumeLister{}) + + err := mgr.Initialize(context.Background()) + require.NoError(t, err) + + err = mgr.ReserveAllocation(context.Background(), "pending-a", 0, 0, 0, 0, 0, 60*1024*1024*1024, false) + require.NoError(t, err) + + err = mgr.ReserveAllocation(context.Background(), "pending-b", 0, 0, 0, 0, 0, 50*1024*1024*1024, false) + require.Error(t, err) + assert.Contains(t, err.Error(), "insufficient disk") + + visibleStatus, err := mgr.GetStatus(context.Background(), ResourceDisk) + require.NoError(t, err) + assert.Equal(t, int64(0), visibleStatus.Allocated) + + mgr.FinishAllocation("pending-a") + mgr.FinishAllocation("pending-a") + + err = mgr.ReserveAllocation(context.Background(), "pending-b", 0, 0, 0, 0, 0, 50*1024*1024*1024, false) + require.NoError(t, err) +} From 476d2c0cdb5bc06dbb4155b2d65e4616246efa4c Mon Sep 17 00:00:00 2001 From: Steven Miller Date: Sun, 5 Apr 2026 02:05:13 -0400 Subject: [PATCH 2/5] Speed up and share burst-proof resource accounting --- lib/images/disk_usage.go | 119 +++++++++++++ lib/images/manager.go | 79 ++------- lib/instances/admission_allocations.go | 145 ++++++++++++++++ lib/instances/create.go | 1 + lib/instances/manager.go | 49 +----- lib/instances/restore.go | 1 + lib/instances/start.go | 1 + lib/instances/storage.go | 4 + lib/resources/README.md | 31 +++- lib/resources/resource.go | 230 +++++++++++++++++++------ lib/resources/resource_test.go | 34 ++++ lib/volumes/manager.go | 58 ++++++- 12 files changed, 585 insertions(+), 167 deletions(-) create mode 100644 lib/images/disk_usage.go create mode 100644 lib/instances/admission_allocations.go diff --git a/lib/images/disk_usage.go b/lib/images/disk_usage.go new file mode 100644 index 00000000..31ec559d --- /dev/null +++ b/lib/images/disk_usage.go @@ -0,0 +1,119 @@ +package images + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" +) + +// totalReadyImageBytesFast sums ready image sizes directly from metadata.json files. +// This is conservative for admission control and disk accounting: if metadata says an +// image is ready, we count its recorded size without re-validating the rootfs path. +func totalReadyImageBytesFast(imagesDir string) (int64, error) { + var total int64 + + err := filepath.Walk(imagesDir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return nil + } + if info.IsDir() || info.Name() != "metadata.json" { + return nil + } + + data, err := os.ReadFile(path) + if err != nil { + return nil + } + + var meta imageMetadata + if err := json.Unmarshal(data, &meta); err != nil { + return nil + } + if meta.Status == StatusReady && meta.SizeBytes > 0 { + total += meta.SizeBytes + } + return nil + }) + if err != nil && !os.IsNotExist(err) { + return 0, fmt.Errorf("walk images directory: %w", err) + } + + return total, nil +} + +// totalOCICacheBlobBytesFast sums blob sizes directly from the OCI cache blob store. +// This counts the actual bytes on disk, including any blob files that are currently +// present but no longer referenced by the OCI layout index. +func totalOCICacheBlobBytesFast(blobDir string) (int64, error) { + var total int64 + + err := filepath.Walk(blobDir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return nil + } + if info.IsDir() { + return nil + } + total += info.Size() + return nil + }) + if err != nil && !os.IsNotExist(err) { + return 0, fmt.Errorf("walk OCI cache blobs: %w", err) + } + + return total, nil +} + +func (m *manager) getDiskUsageTotals() (int64, int64, error) { + m.diskUsageMu.RLock() + if m.diskUsageLoaded { + readyImageBytes := m.readyImageBytes + ociCacheBytes := m.ociCacheBytes + m.diskUsageMu.RUnlock() + return readyImageBytes, ociCacheBytes, nil + } + m.diskUsageMu.RUnlock() + + readyImageBytes, ociCacheBytes, err := m.computeDiskUsageTotals() + if err != nil { + return 0, 0, err + } + + m.diskUsageMu.Lock() + if !m.diskUsageLoaded { + m.readyImageBytes = readyImageBytes + m.ociCacheBytes = ociCacheBytes + m.diskUsageLoaded = true + } + readyImageBytes = m.readyImageBytes + ociCacheBytes = m.ociCacheBytes + m.diskUsageMu.Unlock() + + return readyImageBytes, ociCacheBytes, nil +} + +func (m *manager) refreshDiskUsageTotals() { + readyImageBytes, ociCacheBytes, err := m.computeDiskUsageTotals() + if err != nil { + return + } + + m.diskUsageMu.Lock() + m.readyImageBytes = readyImageBytes + m.ociCacheBytes = ociCacheBytes + m.diskUsageLoaded = true + m.diskUsageMu.Unlock() +} + +func (m *manager) computeDiskUsageTotals() (int64, int64, error) { + readyImageBytes, err := totalReadyImageBytesFast(m.paths.ImagesDir()) + if err != nil { + return 0, 0, err + } + ociCacheBytes, err := totalOCICacheBlobBytesFast(m.paths.OCICacheBlobDir()) + if err != nil { + return 0, 0, err + } + return readyImageBytes, ociCacheBytes, nil +} diff --git a/lib/images/manager.go b/lib/images/manager.go index 4987b4f6..e1799bbb 100644 --- a/lib/images/manager.go +++ b/lib/images/manager.go @@ -10,7 +10,6 @@ import ( "sync" "time" - "github.com/google/go-containerregistry/pkg/v1/layout" "github.com/kernel/hypeman/lib/paths" "github.com/kernel/hypeman/lib/tags" "go.opentelemetry.io/otel/metric" @@ -55,6 +54,10 @@ type manager struct { ociClient *ociClient queue *BuildQueue createMu sync.Mutex + diskUsageMu sync.RWMutex + diskUsageLoaded bool + readyImageBytes int64 + ociCacheBytes int64 metrics *Metrics readySubscribers map[string][]chan StatusEvent // keyed by digestHex subscriberMu sync.RWMutex @@ -319,6 +322,8 @@ func (m *manager) buildImage(ctx context.Context, ref *ResolvedRef) { } } + m.refreshDiskUsageTotals() + m.recordBuildMetrics(ctx, buildStart, "success") } @@ -438,7 +443,11 @@ func (m *manager) DeleteImage(ctx context.Context, name string) error { if err := deleteTagsForDigest(m.paths, repository, digestHex); err != nil { return err } - return deleteDigest(m.paths, repository, digestHex) + if err := deleteDigest(m.paths, repository, digestHex); err != nil { + return err + } + m.refreshDiskUsageTotals() + return nil } tag := ref.Tag() @@ -467,6 +476,7 @@ func (m *manager) DeleteImage(ctx context.Context, name string) error { fmt.Fprintf(os.Stderr, "Warning: failed to delete orphaned digest %s: %v\n", digestHex, err) return nil } + m.refreshDiskUsageTotals() } return nil @@ -474,75 +484,20 @@ func (m *manager) DeleteImage(ctx context.Context, name string) error { // TotalImageBytes returns the total size of all ready images on disk. func (m *manager) TotalImageBytes(ctx context.Context) (int64, error) { - images, err := m.ListImages(ctx) + readyImageBytes, _, err := m.getDiskUsageTotals() if err != nil { return 0, err } - - var total int64 - for _, img := range images { - if img.Status == StatusReady && img.SizeBytes != nil { - total += *img.SizeBytes - } - } - return total, nil + return readyImageBytes, nil } // TotalOCICacheBytes returns the total size of the OCI layer cache. -// Uses OCI layout metadata instead of walking the filesystem for efficiency. func (m *manager) TotalOCICacheBytes(ctx context.Context) (int64, error) { - path, err := layout.FromPath(m.paths.SystemOCICache()) + _, ociCacheBytes, err := m.getDiskUsageTotals() if err != nil { - return 0, nil // No cache yet - } - - index, err := path.ImageIndex() - if err != nil { - return 0, nil // Empty or invalid cache - } - - manifest, err := index.IndexManifest() - if err != nil { - return 0, nil - } - - // Collect unique blob digests and sizes (layers are shared/deduplicated) - blobSizes := make(map[string]int64) - - for _, desc := range manifest.Manifests { - // Count the manifest blob itself - blobSizes[desc.Digest.String()] = desc.Size - - // Get image to access layers and config - img, err := path.Image(desc.Digest) - if err != nil { - continue - } - - // Count config blob - if configDigest, err := img.ConfigName(); err == nil { - if configFile, err := img.RawConfigFile(); err == nil { - blobSizes[configDigest.String()] = int64(len(configFile)) - } - } - - // Count layer blobs - if layers, err := img.Layers(); err == nil { - for _, layer := range layers { - if digest, err := layer.Digest(); err == nil { - if size, err := layer.Size(); err == nil { - blobSizes[digest.String()] = size - } - } - } - } - } - - var total int64 - for _, size := range blobSizes { - total += size + return 0, err } - return total, nil + return ociCacheBytes, nil } // WaitForReady blocks until the image reaches a terminal state (ready or failed) diff --git a/lib/instances/admission_allocations.go b/lib/instances/admission_allocations.go new file mode 100644 index 00000000..93360af7 --- /dev/null +++ b/lib/instances/admission_allocations.go @@ -0,0 +1,145 @@ +package instances + +import ( + "context" + "os" + "path/filepath" + + "github.com/kernel/hypeman/lib/resources" +) + +// ListInstanceAllocations returns a conservative cached allocation view used by +// both admission control and resource status reporting. The cache is initialized +// once from disk, then kept in sync by instance lifecycle metadata updates. +func (m *manager) ListInstanceAllocations(ctx context.Context) ([]resources.InstanceAllocation, error) { + if err := m.ensureAdmissionAllocations(); err != nil { + return nil, err + } + + m.admissionAllocationsMu.RLock() + defer m.admissionAllocationsMu.RUnlock() + + allocations := make([]resources.InstanceAllocation, 0, len(m.admissionAllocations)) + for _, allocation := range m.admissionAllocations { + allocations = append(allocations, allocation) + } + + return allocations, nil +} + +func (m *manager) ensureAdmissionAllocations() error { + m.admissionAllocationsMu.RLock() + if m.admissionAllocationsLoaded { + m.admissionAllocationsMu.RUnlock() + return nil + } + m.admissionAllocationsMu.RUnlock() + + metaFiles, err := m.listMetadataFiles() + if err != nil { + return err + } + + allocations := make(map[string]resources.InstanceAllocation, len(metaFiles)) + for _, metaFile := range metaFiles { + id := filepath.Base(filepath.Dir(metaFile)) + meta, err := m.loadMetadata(id) + if err != nil { + continue + } + allocations[id] = m.allocationFromStoredMetadata(&meta.StoredMetadata, admissionSocketActive(&meta.StoredMetadata)) + } + + m.admissionAllocationsMu.Lock() + defer m.admissionAllocationsMu.Unlock() + if !m.admissionAllocationsLoaded { + m.admissionAllocations = allocations + m.admissionAllocationsLoaded = true + } + + return nil +} + +func (m *manager) syncAdmissionAllocation(meta *metadata) { + m.admissionAllocationsMu.Lock() + defer m.admissionAllocationsMu.Unlock() + + if !m.admissionAllocationsLoaded { + return + } + if m.admissionAllocations == nil { + m.admissionAllocations = make(map[string]resources.InstanceAllocation) + } + + m.admissionAllocations[meta.Id] = m.allocationFromStoredMetadata(&meta.StoredMetadata, admissionMetadataActive(&meta.StoredMetadata)) +} + +func (m *manager) setAdmissionAllocationActive(stored *StoredMetadata, active bool) { + m.admissionAllocationsMu.Lock() + defer m.admissionAllocationsMu.Unlock() + + if !m.admissionAllocationsLoaded { + return + } + if m.admissionAllocations == nil { + m.admissionAllocations = make(map[string]resources.InstanceAllocation) + } + + m.admissionAllocations[stored.Id] = m.allocationFromStoredMetadata(stored, active) +} + +func (m *manager) deleteAdmissionAllocation(id string) { + m.admissionAllocationsMu.Lock() + defer m.admissionAllocationsMu.Unlock() + + if !m.admissionAllocationsLoaded || m.admissionAllocations == nil { + return + } + delete(m.admissionAllocations, id) +} + +func (m *manager) allocationFromStoredMetadata(stored *StoredMetadata, active bool) resources.InstanceAllocation { + var volumeOverlayBytes int64 + var volumeBytes int64 + for _, vol := range stored.Volumes { + if vol.Overlay { + volumeOverlayBytes += vol.OverlaySize + } + if m.volumeManager != nil { + if volume, err := m.volumeManager.GetVolume(context.Background(), vol.VolumeID); err == nil { + volumeBytes += int64(volume.SizeGb) * 1024 * 1024 * 1024 + } + } + } + + state := string(StateStopped) + if active { + state = string(StateCreated) + } + + return resources.InstanceAllocation{ + ID: stored.Id, + Name: stored.Name, + Vcpus: stored.Vcpus, + MemoryBytes: stored.Size + stored.HotplugSize, + OverlayBytes: stored.OverlaySize, + VolumeOverlayBytes: volumeOverlayBytes, + NetworkDownloadBps: stored.NetworkBandwidthDownload, + NetworkUploadBps: stored.NetworkBandwidthUpload, + DiskIOBps: stored.DiskIOBps, + State: state, + VolumeBytes: volumeBytes, + } +} + +func admissionMetadataActive(stored *StoredMetadata) bool { + return stored.HypervisorPID != nil +} + +func admissionSocketActive(stored *StoredMetadata) bool { + if stored.SocketPath == "" { + return false + } + _, err := os.Stat(stored.SocketPath) + return err == nil +} diff --git a/lib/instances/create.go b/lib/instances/create.go index d540b718..af8d93d7 100644 --- a/lib/instances/create.go +++ b/lib/instances/create.go @@ -511,6 +511,7 @@ func (m *manager) createInstance( return nil, err } startVMSpanEnd(nil) + m.setAdmissionAllocationActive(stored, true) if reservedResources { m.resourceValidator.FinishAllocation(id) reservedResources = false diff --git a/lib/instances/manager.go b/lib/instances/manager.go index 1d8dd64b..70da97be 100644 --- a/lib/instances/manager.go +++ b/lib/instances/manager.go @@ -123,6 +123,11 @@ type manager struct { // State change subscriptions for waitForState stateSubscribers *subscribers + // Cached conservative allocation view for fast admission control. + admissionAllocationsMu sync.RWMutex + admissionAllocations map[string]resources.InstanceAllocation + admissionAllocationsLoaded bool + // Hypervisor support vmStarters map[hypervisor.Type]hypervisor.VMStarter defaultHypervisor hypervisor.Type // Default hypervisor type when not specified in request @@ -549,50 +554,6 @@ func (m *manager) DetachVolume(ctx context.Context, id string, volumeId string) return nil, fmt.Errorf("detach volume not yet implemented") } -// ListInstanceAllocations returns resource allocations for all instances. -// Used by the resource manager for capacity tracking. -func (m *manager) ListInstanceAllocations(ctx context.Context) ([]resources.InstanceAllocation, error) { - instances, err := m.listInstances(ctx) - if err != nil { - return nil, err - } - - allocations := make([]resources.InstanceAllocation, 0, len(instances)) - for _, inst := range instances { - // Calculate volume bytes and volume overlay bytes separately - var volumeBytes int64 - var volumeOverlayBytes int64 - for _, vol := range inst.Volumes { - // Get actual volume size from volume manager - if m.volumeManager != nil { - if volume, err := m.volumeManager.GetVolume(ctx, vol.VolumeID); err == nil { - volumeBytes += int64(volume.SizeGb) * 1024 * 1024 * 1024 - } - } - // Track overlay size separately for overlay volumes - if vol.Overlay { - volumeOverlayBytes += vol.OverlaySize - } - } - - allocations = append(allocations, resources.InstanceAllocation{ - ID: inst.Id, - Name: inst.Name, - Vcpus: inst.Vcpus, - MemoryBytes: inst.Size + inst.HotplugSize, - OverlayBytes: inst.OverlaySize, - VolumeOverlayBytes: volumeOverlayBytes, - NetworkDownloadBps: inst.NetworkBandwidthDownload, - NetworkUploadBps: inst.NetworkBandwidthUpload, - DiskIOBps: inst.DiskIOBps, - State: string(inst.State), - VolumeBytes: volumeBytes, - }) - } - - return allocations, nil -} - // ListRunningInstancesInfo returns info needed for utilization metrics collection. // Used by the resource manager for VM utilization tracking. // Includes active VMs in Running or Initializing state. diff --git a/lib/instances/restore.go b/lib/instances/restore.go index 07d53757..8d79b171 100644 --- a/lib/instances/restore.go +++ b/lib/instances/restore.go @@ -261,6 +261,7 @@ func (m *manager) restoreInstance( return nil, fmt.Errorf("resume vm failed: %w", err) } resumeSpanEnd(nil) + m.setAdmissionAllocationActive(stored, true) if reservedResources { m.resourceValidator.FinishAllocation(id) reservedResources = false diff --git a/lib/instances/start.go b/lib/instances/start.go index daac0641..90a3af7d 100644 --- a/lib/instances/start.go +++ b/lib/instances/start.go @@ -196,6 +196,7 @@ func (m *manager) startInstance( return nil, err } startVMSpanEnd(nil) + m.setAdmissionAllocationActive(stored, true) if reservedResources { m.resourceValidator.FinishAllocation(id) reservedResources = false diff --git a/lib/instances/storage.go b/lib/instances/storage.go index 4bf7f45b..27e4e7af 100644 --- a/lib/instances/storage.go +++ b/lib/instances/storage.go @@ -79,6 +79,8 @@ func (m *manager) saveMetadata(meta *metadata) error { return fmt.Errorf("write metadata: %w", err) } + m.syncAdmissionAllocation(meta) + return nil } @@ -111,6 +113,8 @@ func (m *manager) deleteInstanceData(id string) error { return fmt.Errorf("remove instance directory: %w", err) } + m.deleteAdmissionAllocation(id) + return nil } diff --git a/lib/resources/README.md b/lib/resources/README.md index 22d29350..af71f6fd 100644 --- a/lib/resources/README.md +++ b/lib/resources/README.md @@ -1,12 +1,13 @@ # Resource Management -Host resource discovery, capacity tracking, and oversubscription-aware allocation management for CPU, memory, disk, and network. +Host resource discovery, capacity tracking, oversubscription-aware allocation management, and burst-proof admission control for CPU, memory, disk, network, and disk I/O. ## Features - **Resource Discovery**: Automatically detects host capacity from `/proc/cpuinfo`, `/proc/meminfo`, filesystem stats, and network interface speed - **Oversubscription**: Configurable ratios per resource type (e.g., 2x CPU oversubscription) - **Allocation Tracking**: Tracks resource usage across all running instances +- **Burst-Proof Admission**: In-flight create/start/restore operations reserve capacity before they become visible to normal allocation tracking - **Bidirectional Network Rate Limiting**: Separate download/upload limits with fair sharing - **API Endpoint**: `GET /resources` returns capacity, allocations, and per-instance breakdown @@ -39,6 +40,7 @@ Host resource discovery, capacity tracking, and oversubscription-aware allocatio ### Disk - Discovered via `statfs()` on DataDir, or configured via `DISK_LIMIT` - Allocated = images (rootfs) + OCI cache + volumes + overlays (rootfs + volume) +- Admission uses the same logical/provisioned disk model as `GET /resources` and `hypeman resources` - Image pulls blocked when <5GB available or image storage exceeds `MAX_IMAGE_STORAGE` ### Network @@ -97,6 +99,33 @@ available = effective_limit - allocated For example, with 64 CPUs and `OVERSUB_CPU=2.0`, up to 128 vCPUs can be allocated across instances. +## Admission Reservations + +Resource accounting uses two views of allocation: + +- **Visible allocation**: what `GetStatus`, `/resources`, metrics, `hypeman resources`, and admission checks use from the current instance/image/volume state +- **Pending allocation**: temporary in-memory reservations for in-flight `create`, `start`, and `restore` operations + +When Hypeman checks admission, it computes: + +```text +admission_allocated = visible_allocated + pending_allocated +admission_available = effective_limit - admission_allocated +``` + +This prevents burst oversubscription: if one lifecycle operation reserves capacity first, the next one sees that reservation immediately even before the first VM is fully visible in normal allocation tracking. + +Important behavior: + +- Pending reservations affect **admission only** +- `GetStatus`, `/resources`, and metrics continue to show **visible allocation only** +- The resource-manager lock only covers reservation bookkeeping and validation +- VM boot, restore, guest-agent waits, and other slow lifecycle steps happen **outside** that lock + +### Hot-Path Note + +Visible allocation is optimized around one conservative cached instance-allocation view plus cached image, OCI cache, and volume totals. Those caches are warmed at startup and then updated when lifecycle or storage state changes. Admission validation reads that visible usage once per reservation and combines it with O(1) pending totals. This keeps both `hypeman resources` and the admission hot path fast without live hypervisor queries or repeated full filesystem scans on each request. + ## API Response ```json diff --git a/lib/resources/resource.go b/lib/resources/resource.go index 34c6adec..3a431a11 100644 --- a/lib/resources/resource.go +++ b/lib/resources/resource.go @@ -164,6 +164,14 @@ type pendingAllocation struct { GPUSlots int } +type admissionUsage struct { + CPU int64 + MemoryBytes int64 + DiskBytes int64 + NetworkBps int64 + DiskIOBps int64 +} + // Manager coordinates resource discovery and allocation tracking. type Manager struct { cfg *config.Config @@ -179,7 +187,8 @@ type Manager struct { volumeLister VolumeLister // Pending allocations are used only for admission decisions. - pending map[string]pendingAllocation + pending map[string]pendingAllocation + pendingTotals pendingAllocation } // NewManager creates a new resource manager. @@ -254,6 +263,27 @@ func (m *Manager) Initialize(ctx context.Context) error { diskIO := NewDiskIOResource(m.DiskIOCapacity(), m.instanceLister) m.resources[ResourceDiskIO] = diskIO + // Warm fast allocation-related caches at startup so the first reservation or + // resource status read doesn't pay the one-time scan cost on the hot path. + if m.instanceLister != nil { + if _, err := m.instanceLister.ListInstanceAllocations(ctx); err != nil { + return fmt.Errorf("warm instance allocations: %w", err) + } + } + if m.imageLister != nil { + if _, err := m.imageLister.TotalImageBytes(ctx); err != nil { + return fmt.Errorf("warm image disk usage totals: %w", err) + } + if _, err := m.imageLister.TotalOCICacheBytes(ctx); err != nil { + return fmt.Errorf("warm OCI cache usage totals: %w", err) + } + } + if m.volumeLister != nil { + if _, err := m.volumeLister.TotalVolumeBytes(ctx); err != nil { + return fmt.Errorf("warm volume usage totals: %w", err) + } + } + return nil } @@ -416,7 +446,29 @@ func (m *Manager) CanAllocate(ctx context.Context, rt ResourceType, amount int64 m.mu.Lock() defer m.mu.Unlock() - status, err := m.getAdmissionStatusLocked(ctx, rt, "") + usage, err := m.collectAdmissionUsageLocked(ctx) + if err != nil { + return false, err + } + pending := m.pendingTotalsExcludingLocked("") + + var visibleAllocated int64 + switch rt { + case ResourceCPU: + visibleAllocated = usage.CPU + case ResourceMemory: + visibleAllocated = usage.MemoryBytes + case ResourceDisk: + visibleAllocated = usage.DiskBytes + case ResourceNetwork: + visibleAllocated = usage.NetworkBps + case ResourceDiskIO: + visibleAllocated = usage.DiskIOBps + default: + return false, fmt.Errorf("unsupported admission resource type: %s", rt) + } + + status, err := m.admissionStatusLocked(rt, visibleAllocated, pending) if err != nil { return false, err } @@ -445,82 +497,139 @@ func newPendingAllocation(vcpus int, memoryBytes int64, networkDownloadBps int64 return req } -func (m *Manager) pendingAmountLocked(rt ResourceType, excludeID string) int64 { - var total int64 - for id, pending := range m.pending { - if id == excludeID { - continue - } - switch rt { - case ResourceCPU: - total += pending.CPU - case ResourceMemory: - total += pending.MemoryBytes - case ResourceDisk: - total += pending.DiskBytes - case ResourceNetwork: - total += pending.NetworkBps - case ResourceDiskIO: - total += pending.DiskIOBps - } - } - return total +func (m *Manager) addPendingTotalsLocked(req pendingAllocation) { + m.pendingTotals.CPU += req.CPU + m.pendingTotals.MemoryBytes += req.MemoryBytes + m.pendingTotals.DiskBytes += req.DiskBytes + m.pendingTotals.NetworkBps += req.NetworkBps + m.pendingTotals.DiskIOBps += req.DiskIOBps + m.pendingTotals.GPUSlots += req.GPUSlots } -func (m *Manager) pendingGPUSlotsLocked(excludeID string) int { - var total int - for id, pending := range m.pending { - if id == excludeID { - continue - } - total += pending.GPUSlots +func (m *Manager) subtractPendingTotalsLocked(req pendingAllocation) { + m.pendingTotals.CPU -= req.CPU + m.pendingTotals.MemoryBytes -= req.MemoryBytes + m.pendingTotals.DiskBytes -= req.DiskBytes + m.pendingTotals.NetworkBps -= req.NetworkBps + m.pendingTotals.DiskIOBps -= req.DiskIOBps + m.pendingTotals.GPUSlots -= req.GPUSlots +} + +func (m *Manager) pendingTotalsExcludingLocked(excludeID string) pendingAllocation { + total := m.pendingTotals + if excludeID == "" { + return total + } + if existing, ok := m.pending[excludeID]; ok { + total.CPU -= existing.CPU + total.MemoryBytes -= existing.MemoryBytes + total.DiskBytes -= existing.DiskBytes + total.NetworkBps -= existing.NetworkBps + total.DiskIOBps -= existing.DiskIOBps + total.GPUSlots -= existing.GPUSlots } return total } -func (m *Manager) getAdmissionStatusLocked(ctx context.Context, rt ResourceType, excludeID string) (*ResourceStatus, error) { - res, ok := m.resources[rt] - if !ok { - return nil, fmt.Errorf("unknown resource type: %s", rt) +func (m *Manager) collectAdmissionUsageLocked(ctx context.Context) (admissionUsage, error) { + var usage admissionUsage + + if m.instanceLister != nil { + instances, err := m.instanceLister.ListInstanceAllocations(ctx) + if err != nil { + return admissionUsage{}, fmt.Errorf("list instance allocations: %w", err) + } + for _, inst := range instances { + if !isActiveState(inst.State) { + continue + } + usage.CPU += int64(inst.Vcpus) + usage.MemoryBytes += inst.MemoryBytes + usage.DiskBytes += inst.OverlayBytes + inst.VolumeOverlayBytes + usage.DiskIOBps += inst.DiskIOBps + + allocNet := inst.NetworkDownloadBps + if inst.NetworkUploadBps > allocNet { + allocNet = inst.NetworkUploadBps + } + usage.NetworkBps += allocNet + } } - capacity := res.Capacity() - ratio := m.GetOversubRatio(rt) - effectiveLimit := int64(float64(capacity) * ratio) + if m.imageLister != nil { + imageBytes, err := m.imageLister.TotalImageBytes(ctx) + if err != nil { + return admissionUsage{}, fmt.Errorf("total image bytes: %w", err) + } + ociCacheBytes, err := m.imageLister.TotalOCICacheBytes(ctx) + if err != nil { + return admissionUsage{}, fmt.Errorf("total OCI cache bytes: %w", err) + } + usage.DiskBytes += imageBytes + ociCacheBytes + } - allocated, err := res.Allocated(ctx) - if err != nil { - return nil, fmt.Errorf("get allocated %s: %w", rt, err) + if m.volumeLister != nil { + volumeBytes, err := m.volumeLister.TotalVolumeBytes(ctx) + if err != nil { + return admissionUsage{}, fmt.Errorf("total volume bytes: %w", err) + } + usage.DiskBytes += volumeBytes } - allocated += m.pendingAmountLocked(rt, excludeID) - available := effectiveLimit - allocated - if available < 0 { - available = 0 + return usage, nil +} + +func (m *Manager) admissionStatusLocked(rt ResourceType, visibleAllocated int64, pending pendingAllocation) (*ResourceStatus, error) { + res, ok := m.resources[rt] + if !ok { + return nil, fmt.Errorf("unknown resource type: %s", rt) } status := &ResourceStatus{ - Type: rt, - Capacity: capacity, - EffectiveLimit: effectiveLimit, - Allocated: allocated, - Available: available, - OversubRatio: ratio, + Type: rt, + Capacity: res.Capacity(), + OversubRatio: m.GetOversubRatio(rt), } - if rt == ResourceNetwork { + status.EffectiveLimit = int64(float64(status.Capacity) * status.OversubRatio) + + switch rt { + case ResourceCPU: + status.Allocated = visibleAllocated + pending.CPU + case ResourceMemory: + status.Allocated = visibleAllocated + pending.MemoryBytes + case ResourceDisk: + status.Allocated = visibleAllocated + pending.DiskBytes + case ResourceNetwork: + status.Allocated = visibleAllocated + pending.NetworkBps if m.cfg.Capacity.Network != "" { status.Source = SourceConfigured } else { status.Source = SourceDetected } + case ResourceDiskIO: + status.Allocated = visibleAllocated + pending.DiskIOBps + default: + return nil, fmt.Errorf("unsupported admission resource type: %s", rt) + } + + status.Available = status.EffectiveLimit - status.Allocated + if status.Available < 0 { + status.Available = 0 } + return status, nil } func (m *Manager) validateAllocationLocked(ctx context.Context, excludeID string, req pendingAllocation) error { + usage, err := m.collectAdmissionUsageLocked(ctx) + if err != nil { + return err + } + pending := m.pendingTotalsExcludingLocked(excludeID) + // Check CPU if req.CPU > 0 { - status, err := m.getAdmissionStatusLocked(ctx, ResourceCPU, excludeID) + status, err := m.admissionStatusLocked(ResourceCPU, usage.CPU, pending) if err != nil { return fmt.Errorf("check CPU capacity: %w", err) } @@ -532,7 +641,7 @@ func (m *Manager) validateAllocationLocked(ctx context.Context, excludeID string // Check Memory if req.MemoryBytes > 0 { - status, err := m.getAdmissionStatusLocked(ctx, ResourceMemory, excludeID) + status, err := m.admissionStatusLocked(ResourceMemory, usage.MemoryBytes, pending) if err != nil { return fmt.Errorf("check memory capacity: %w", err) } @@ -544,7 +653,7 @@ func (m *Manager) validateAllocationLocked(ctx context.Context, excludeID string // Check Disk if req.DiskBytes > 0 { - status, err := m.getAdmissionStatusLocked(ctx, ResourceDisk, excludeID) + status, err := m.admissionStatusLocked(ResourceDisk, usage.DiskBytes, pending) if err != nil { return fmt.Errorf("check disk capacity: %w", err) } @@ -556,7 +665,7 @@ func (m *Manager) validateAllocationLocked(ctx context.Context, excludeID string // Check Network (use max of download/upload since they share physical link) if req.NetworkBps > 0 { - status, err := m.getAdmissionStatusLocked(ctx, ResourceNetwork, excludeID) + status, err := m.admissionStatusLocked(ResourceNetwork, usage.NetworkBps, pending) if err != nil { return fmt.Errorf("check network capacity: %w", err) } @@ -568,7 +677,7 @@ func (m *Manager) validateAllocationLocked(ctx context.Context, excludeID string // Check Disk I/O if req.DiskIOBps > 0 { - status, err := m.getAdmissionStatusLocked(ctx, ResourceDiskIO, excludeID) + status, err := m.admissionStatusLocked(ResourceDiskIO, usage.DiskIOBps, pending) if err != nil { return fmt.Errorf("check disk I/O capacity: %w", err) } @@ -586,7 +695,7 @@ func (m *Manager) validateAllocationLocked(ctx context.Context, excludeID string if gpuStatus == nil { return fmt.Errorf("insufficient GPU: no GPU available on this host") } - availableSlots := gpuStatus.TotalSlots - gpuStatus.UsedSlots - m.pendingGPUSlotsLocked(excludeID) + availableSlots := gpuStatus.TotalSlots - gpuStatus.UsedSlots - pending.GPUSlots if availableSlots < req.GPUSlots { if availableSlots <= 0 { return fmt.Errorf("insufficient GPU: all %d %s slots are in use", @@ -620,7 +729,11 @@ func (m *Manager) ReserveAllocation(ctx context.Context, instanceID string, vcpu if err := m.validateAllocationLocked(ctx, instanceID, req); err != nil { return err } + if existing, ok := m.pending[instanceID]; ok { + m.subtractPendingTotalsLocked(existing) + } m.pending[instanceID] = req + m.addPendingTotalsLocked(req) return nil } @@ -632,7 +745,10 @@ func (m *Manager) FinishAllocation(instanceID string) { m.mu.Lock() defer m.mu.Unlock() - delete(m.pending, instanceID) + if existing, ok := m.pending[instanceID]; ok { + m.subtractPendingTotalsLocked(existing) + delete(m.pending, instanceID) + } } // CPUCapacity returns the raw CPU capacity (number of vCPUs). diff --git a/lib/resources/resource_test.go b/lib/resources/resource_test.go index 38bf9322..852f8a7b 100644 --- a/lib/resources/resource_test.go +++ b/lib/resources/resource_test.go @@ -532,3 +532,37 @@ func TestReserveAllocation_TracksPendingCapacity(t *testing.T) { err = mgr.ReserveAllocation(context.Background(), "pending-b", 0, 0, 0, 0, 0, 50*1024*1024*1024, false) require.NoError(t, err) } + +func TestReserveAllocation_ReplacesExistingReservation(t *testing.T) { + cfg := &config.Config{ + DataDir: t.TempDir(), + Capacity: config.CapacityConfig{ + Disk: "100GB", + }, + Oversubscription: config.OversubscriptionConfig{ + CPU: 1.0, Memory: 1.0, Disk: 1.0, Network: 1.0, DiskIO: 1.0, + }, + } + p := paths.New(cfg.DataDir) + + mgr := NewManager(cfg, p) + mgr.SetInstanceLister(&mockInstanceLister{}) + mgr.SetImageLister(&mockImageLister{}) + mgr.SetVolumeLister(&mockVolumeLister{}) + + err := mgr.Initialize(context.Background()) + require.NoError(t, err) + + err = mgr.ReserveAllocation(context.Background(), "pending-a", 0, 0, 0, 0, 0, 60*1024*1024*1024, false) + require.NoError(t, err) + + err = mgr.ReserveAllocation(context.Background(), "pending-a", 0, 0, 0, 0, 0, 40*1024*1024*1024, false) + require.NoError(t, err) + + err = mgr.ReserveAllocation(context.Background(), "pending-b", 0, 0, 0, 0, 0, 61*1024*1024*1024, false) + require.Error(t, err) + assert.Contains(t, err.Error(), "insufficient disk") + + err = mgr.ReserveAllocation(context.Background(), "pending-b", 0, 0, 0, 0, 0, 60*1024*1024*1024, false) + require.NoError(t, err) +} diff --git a/lib/volumes/manager.go b/lib/volumes/manager.go index 705759c6..f33d0b86 100644 --- a/lib/volumes/manager.go +++ b/lib/volumes/manager.go @@ -44,6 +44,9 @@ type manager struct { paths *paths.Paths maxTotalVolumeStorage int64 // Maximum total volume storage in bytes (0 = unlimited) volumeLocks sync.Map // map[string]*sync.RWMutex - per-volume locks + totalsMu sync.RWMutex + totalVolumeBytes int64 + totalVolumeBytesReady bool metrics *Metrics } @@ -108,6 +111,49 @@ func (m *manager) calculateTotalVolumeStorage(ctx context.Context) (int64, error return totalBytes, nil } +func (m *manager) getTotalVolumeBytes(ctx context.Context) (int64, error) { + m.totalsMu.RLock() + if m.totalVolumeBytesReady { + total := m.totalVolumeBytes + m.totalsMu.RUnlock() + return total, nil + } + m.totalsMu.RUnlock() + + total, err := m.calculateTotalVolumeStorage(ctx) + if err != nil { + return 0, err + } + + m.totalsMu.Lock() + if !m.totalVolumeBytesReady { + m.totalVolumeBytes = total + m.totalVolumeBytesReady = true + } + total = m.totalVolumeBytes + m.totalsMu.Unlock() + + return total, nil +} + +func (m *manager) addVolumeBytes(sizeBytes int64) { + m.totalsMu.Lock() + defer m.totalsMu.Unlock() + if !m.totalVolumeBytesReady { + return + } + m.totalVolumeBytes += sizeBytes +} + +func (m *manager) subtractVolumeBytes(sizeBytes int64) { + m.totalsMu.Lock() + defer m.totalsMu.Unlock() + if !m.totalVolumeBytesReady { + return + } + m.totalVolumeBytes -= sizeBytes +} + // CreateVolume creates a new volume func (m *manager) CreateVolume(ctx context.Context, req CreateVolumeRequest) (*Volume, error) { start := time.Now() @@ -128,7 +174,7 @@ func (m *manager) CreateVolume(ctx context.Context, req CreateVolumeRequest) (*V // Check total volume storage limit if m.maxTotalVolumeStorage > 0 { - currentStorage, err := m.calculateTotalVolumeStorage(ctx) + currentStorage, err := m.getTotalVolumeBytes(ctx) if err != nil { // Log but don't fail - continue with creation // (better to allow creation than block due to listing error) @@ -169,6 +215,8 @@ func (m *manager) CreateVolume(ctx context.Context, req CreateVolumeRequest) (*V return nil, err } + m.addVolumeBytes(int64(req.SizeGb) * 1024 * 1024 * 1024) + m.recordCreateDuration(ctx, start, "success") return m.metadataToVolume(meta), nil } @@ -196,7 +244,7 @@ func (m *manager) CreateVolumeFromArchive(ctx context.Context, req CreateVolumeF // Check total volume storage limit if m.maxTotalVolumeStorage > 0 { - currentStorage, err := m.calculateTotalVolumeStorage(ctx) + currentStorage, err := m.getTotalVolumeBytes(ctx) if err != nil { // Log but don't fail - continue with creation } else { @@ -254,6 +302,8 @@ func (m *manager) CreateVolumeFromArchive(ctx context.Context, req CreateVolumeF return nil, err } + m.addVolumeBytes(int64(actualSizeGb) * 1024 * 1024 * 1024) + m.recordCreateDuration(ctx, start, "success") return m.metadataToVolume(meta), nil } @@ -319,6 +369,8 @@ func (m *manager) DeleteVolume(ctx context.Context, id string) error { return err } + m.subtractVolumeBytes(int64(meta.SizeGb) * 1024 * 1024 * 1024) + // Clean up lock m.volumeLocks.Delete(id) @@ -408,7 +460,7 @@ func (m *manager) GetVolumePath(id string) string { // TotalVolumeBytes returns the total size of all volumes. func (m *manager) TotalVolumeBytes(ctx context.Context) (int64, error) { - return m.calculateTotalVolumeStorage(ctx) + return m.getTotalVolumeBytes(ctx) } // metadataToVolume converts stored metadata to a Volume struct From ee9468ea1853ed54584b92e6233dbcb6f11cfc01 Mon Sep 17 00:00:00 2001 From: Steven Miller Date: Sun, 5 Apr 2026 02:27:11 -0400 Subject: [PATCH 3/5] Rename disk usage helpers semantically --- lib/images/disk_usage.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/lib/images/disk_usage.go b/lib/images/disk_usage.go index 31ec559d..aab6415f 100644 --- a/lib/images/disk_usage.go +++ b/lib/images/disk_usage.go @@ -7,10 +7,10 @@ import ( "path/filepath" ) -// totalReadyImageBytesFast sums ready image sizes directly from metadata.json files. +// totalReadyImageBytesFromMetadata sums ready image sizes directly from metadata.json files. // This is conservative for admission control and disk accounting: if metadata says an // image is ready, we count its recorded size without re-validating the rootfs path. -func totalReadyImageBytesFast(imagesDir string) (int64, error) { +func totalReadyImageBytesFromMetadata(imagesDir string) (int64, error) { var total int64 err := filepath.Walk(imagesDir, func(path string, info os.FileInfo, err error) error { @@ -42,10 +42,10 @@ func totalReadyImageBytesFast(imagesDir string) (int64, error) { return total, nil } -// totalOCICacheBlobBytesFast sums blob sizes directly from the OCI cache blob store. +// totalOCICacheBlobBytesFromFilesystem sums blob sizes directly from the OCI cache blob store. // This counts the actual bytes on disk, including any blob files that are currently // present but no longer referenced by the OCI layout index. -func totalOCICacheBlobBytesFast(blobDir string) (int64, error) { +func totalOCICacheBlobBytesFromFilesystem(blobDir string) (int64, error) { var total int64 err := filepath.Walk(blobDir, func(path string, info os.FileInfo, err error) error { @@ -107,11 +107,11 @@ func (m *manager) refreshDiskUsageTotals() { } func (m *manager) computeDiskUsageTotals() (int64, int64, error) { - readyImageBytes, err := totalReadyImageBytesFast(m.paths.ImagesDir()) + readyImageBytes, err := totalReadyImageBytesFromMetadata(m.paths.ImagesDir()) if err != nil { return 0, 0, err } - ociCacheBytes, err := totalOCICacheBlobBytesFast(m.paths.OCICacheBlobDir()) + ociCacheBytes, err := totalOCICacheBlobBytesFromFilesystem(m.paths.OCICacheBlobDir()) if err != nil { return 0, 0, err } From 5bc10ad8d8b2b7c8ec305ba2df82ce36987c1942 Mon Sep 17 00:00:00 2001 From: Steven Miller Date: Mon, 6 Apr 2026 11:12:26 -0400 Subject: [PATCH 4/5] Harden burst-proof accounting paths --- lib/images/disk_usage.go | 65 ++++++++++++++++++++++++-- lib/images/disk_usage_test.go | 37 +++++++++++++++ lib/instances/admission_allocations.go | 6 +++ lib/instances/create.go | 4 ++ lib/instances/restore.go | 4 ++ lib/instances/start.go | 4 ++ lib/resources/resource.go | 8 ++-- 7 files changed, 119 insertions(+), 9 deletions(-) create mode 100644 lib/images/disk_usage_test.go diff --git a/lib/images/disk_usage.go b/lib/images/disk_usage.go index aab6415f..47914c27 100644 --- a/lib/images/disk_usage.go +++ b/lib/images/disk_usage.go @@ -9,13 +9,18 @@ import ( // totalReadyImageBytesFromMetadata sums ready image sizes directly from metadata.json files. // This is conservative for admission control and disk accounting: if metadata says an -// image is ready, we count its recorded size without re-validating the rootfs path. +// image is ready, we count its recorded size without re-validating the rootfs path. If +// the metadata file is unreadable or malformed, we fall back to counting any rootfs disk +// files found in the digest directory so we do not undercount host disk usage. func totalReadyImageBytesFromMetadata(imagesDir string) (int64, error) { var total int64 err := filepath.Walk(imagesDir, func(path string, info os.FileInfo, err error) error { if err != nil { - return nil + if os.IsNotExist(err) { + return nil + } + return err } if info.IsDir() || info.Name() != "metadata.json" { return nil @@ -23,15 +28,33 @@ func totalReadyImageBytesFromMetadata(imagesDir string) (int64, error) { data, err := os.ReadFile(path) if err != nil { - return nil + rootfsBytes, fallbackErr := totalRootfsBytesInDigestDir(filepath.Dir(path)) + if fallbackErr == nil { + total += rootfsBytes + return nil + } + return fmt.Errorf("read image metadata %s: %w", path, err) } var meta imageMetadata if err := json.Unmarshal(data, &meta); err != nil { - return nil + rootfsBytes, fallbackErr := totalRootfsBytesInDigestDir(filepath.Dir(path)) + if fallbackErr == nil { + total += rootfsBytes + return nil + } + return fmt.Errorf("unmarshal image metadata %s: %w", path, err) } if meta.Status == StatusReady && meta.SizeBytes > 0 { total += meta.SizeBytes + return nil + } + if meta.Status == StatusReady { + rootfsBytes, err := totalRootfsBytesInDigestDir(filepath.Dir(path)) + if err != nil { + return fmt.Errorf("stat ready image rootfs for %s: %w", path, err) + } + total += rootfsBytes } return nil }) @@ -50,7 +73,10 @@ func totalOCICacheBlobBytesFromFilesystem(blobDir string) (int64, error) { err := filepath.Walk(blobDir, func(path string, info os.FileInfo, err error) error { if err != nil { - return nil + if os.IsNotExist(err) { + return nil + } + return err } if info.IsDir() { return nil @@ -117,3 +143,32 @@ func (m *manager) computeDiskUsageTotals() (int64, int64, error) { } return readyImageBytes, ociCacheBytes, nil } + +func totalRootfsBytesInDigestDir(digestDir string) (int64, error) { + rootfsPaths, err := filepath.Glob(filepath.Join(digestDir, "rootfs.*")) + if err != nil { + return 0, err + } + if len(rootfsPaths) == 0 { + return 0, os.ErrNotExist + } + + var total int64 + for _, rootfsPath := range rootfsPaths { + info, err := os.Stat(rootfsPath) + if err != nil { + if os.IsNotExist(err) { + continue + } + return 0, err + } + if info.IsDir() { + continue + } + total += info.Size() + } + if total == 0 { + return 0, os.ErrNotExist + } + return total, nil +} diff --git a/lib/images/disk_usage_test.go b/lib/images/disk_usage_test.go new file mode 100644 index 00000000..36e5bc0d --- /dev/null +++ b/lib/images/disk_usage_test.go @@ -0,0 +1,37 @@ +package images + +import ( + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestTotalReadyImageBytesFromMetadata_UsesRootfsFallbackForMalformedMetadata(t *testing.T) { + t.Parallel() + + imagesDir := t.TempDir() + digestDir := filepath.Join(imagesDir, "docker.io", "library", "alpine", "sha256deadbeef") + require.NoError(t, os.MkdirAll(digestDir, 0o755)) + require.NoError(t, os.WriteFile(filepath.Join(digestDir, "metadata.json"), []byte("{not-json"), 0o644)) + require.NoError(t, os.WriteFile(filepath.Join(digestDir, "rootfs.erofs"), []byte("rootfs-data"), 0o644)) + + total, err := totalReadyImageBytesFromMetadata(imagesDir) + require.NoError(t, err) + require.Equal(t, int64(len("rootfs-data")), total) +} + +func TestTotalReadyImageBytesFromMetadata_UsesRootfsFallbackForReadyImageWithoutSize(t *testing.T) { + t.Parallel() + + imagesDir := t.TempDir() + digestDir := filepath.Join(imagesDir, "docker.io", "library", "alpine", "sha256deadbeef") + require.NoError(t, os.MkdirAll(digestDir, 0o755)) + require.NoError(t, os.WriteFile(filepath.Join(digestDir, "metadata.json"), []byte(`{"status":"ready","size_bytes":0}`), 0o644)) + require.NoError(t, os.WriteFile(filepath.Join(digestDir, "rootfs.erofs"), []byte("another-rootfs"), 0o644)) + + total, err := totalReadyImageBytesFromMetadata(imagesDir) + require.NoError(t, err) + require.Equal(t, int64(len("another-rootfs")), total) +} diff --git a/lib/instances/admission_allocations.go b/lib/instances/admission_allocations.go index 93360af7..5a715d28 100644 --- a/lib/instances/admission_allocations.go +++ b/lib/instances/admission_allocations.go @@ -47,6 +47,9 @@ func (m *manager) ensureAdmissionAllocations() error { if err != nil { continue } + // Cold-start cache hydration uses socket existence as the ground-truth + // signal for "currently active" because persisted metadata can outlive a + // crashed or already-stopped hypervisor process. allocations[id] = m.allocationFromStoredMetadata(&meta.StoredMetadata, admissionSocketActive(&meta.StoredMetadata)) } @@ -71,6 +74,9 @@ func (m *manager) syncAdmissionAllocation(meta *metadata) { m.admissionAllocations = make(map[string]resources.InstanceAllocation) } + // Incremental cache updates trust metadata because lifecycle paths update + // visibility explicitly once boot/restore has succeeded, avoiding repeated + // filesystem/socket probes on the hot path. m.admissionAllocations[meta.Id] = m.allocationFromStoredMetadata(&meta.StoredMetadata, admissionMetadataActive(&meta.StoredMetadata)) } diff --git a/lib/instances/create.go b/lib/instances/create.go index af8d93d7..45935af8 100644 --- a/lib/instances/create.go +++ b/lib/instances/create.go @@ -511,6 +511,10 @@ func (m *manager) createInstance( return nil, err } startVMSpanEnd(nil) + // Mark the instance visible before releasing its pending reservation so we + // never create an undercount window. The tiny overlap is intentionally + // over-conservative: concurrent admissions may briefly see both visible and + // pending usage for this instance, but they will not oversubscribe the host. m.setAdmissionAllocationActive(stored, true) if reservedResources { m.resourceValidator.FinishAllocation(id) diff --git a/lib/instances/restore.go b/lib/instances/restore.go index 8d79b171..18023218 100644 --- a/lib/instances/restore.go +++ b/lib/instances/restore.go @@ -261,6 +261,10 @@ func (m *manager) restoreInstance( return nil, fmt.Errorf("resume vm failed: %w", err) } resumeSpanEnd(nil) + // Mark the instance visible before releasing its pending reservation so we + // never create an undercount window. The tiny overlap is intentionally + // over-conservative: concurrent admissions may briefly see both visible and + // pending usage for this instance, but they will not oversubscribe the host. m.setAdmissionAllocationActive(stored, true) if reservedResources { m.resourceValidator.FinishAllocation(id) diff --git a/lib/instances/start.go b/lib/instances/start.go index 90a3af7d..8da3026e 100644 --- a/lib/instances/start.go +++ b/lib/instances/start.go @@ -196,6 +196,10 @@ func (m *manager) startInstance( return nil, err } startVMSpanEnd(nil) + // Mark the instance visible before releasing its pending reservation so we + // never create an undercount window. The tiny overlap is intentionally + // over-conservative: concurrent admissions may briefly see both visible and + // pending usage for this instance, but they will not oversubscribe the host. m.setAdmissionAllocationActive(stored, true) if reservedResources { m.resourceValidator.FinishAllocation(id) diff --git a/lib/resources/resource.go b/lib/resources/resource.go index 3a431a11..79f1c974 100644 --- a/lib/resources/resource.go +++ b/lib/resources/resource.go @@ -443,8 +443,8 @@ func (m *Manager) GetFullStatus(ctx context.Context) (*FullResourceStatus, error // CanAllocate checks if the requested amount can be allocated for a resource type. func (m *Manager) CanAllocate(ctx context.Context, rt ResourceType, amount int64) (bool, error) { - m.mu.Lock() - defer m.mu.Unlock() + m.mu.RLock() + defer m.mu.RUnlock() usage, err := m.collectAdmissionUsageLocked(ctx) if err != nil { @@ -713,8 +713,8 @@ func (m *Manager) validateAllocationLocked(ctx context.Context, excludeID string // Returns nil if allocation is allowed, or a detailed error describing // which resource is insufficient and the current capacity/usage. func (m *Manager) ValidateAllocation(ctx context.Context, vcpus int, memoryBytes int64, networkDownloadBps int64, networkUploadBps int64, diskIOBps int64, diskBytes int64, needsGPU bool) error { - m.mu.Lock() - defer m.mu.Unlock() + m.mu.RLock() + defer m.mu.RUnlock() req := newPendingAllocation(vcpus, memoryBytes, networkDownloadBps, networkUploadBps, diskIOBps, diskBytes, needsGPU) return m.validateAllocationLocked(ctx, "", req) From 9566638b3fd1dcae51b27c053a4bcbe10cba84e5 Mon Sep 17 00:00:00 2001 From: Steven Miller Date: Mon, 6 Apr 2026 11:34:19 -0400 Subject: [PATCH 5/5] Fix restore admission cache rollback --- lib/instances/admission_allocations.go | 11 ++++++ lib/instances/admission_allocations_test.go | 41 +++++++++++++++++++++ lib/instances/restore.go | 1 + 3 files changed, 53 insertions(+) create mode 100644 lib/instances/admission_allocations_test.go diff --git a/lib/instances/admission_allocations.go b/lib/instances/admission_allocations.go index 5a715d28..6832b7dc 100644 --- a/lib/instances/admission_allocations.go +++ b/lib/instances/admission_allocations.go @@ -94,6 +94,17 @@ func (m *manager) setAdmissionAllocationActive(stored *StoredMetadata, active bo m.admissionAllocations[stored.Id] = m.allocationFromStoredMetadata(stored, active) } +func (m *manager) rollbackAdmissionAllocationActive(stored *StoredMetadata) { + if stored == nil { + return + } + // Failed post-boot/restore steps should not leave the cached visible + // allocation marked active. Clear the in-memory PID first so any later sync + // from this metadata view also treats the instance as inactive. + stored.HypervisorPID = nil + m.setAdmissionAllocationActive(stored, false) +} + func (m *manager) deleteAdmissionAllocation(id string) { m.admissionAllocationsMu.Lock() defer m.admissionAllocationsMu.Unlock() diff --git a/lib/instances/admission_allocations_test.go b/lib/instances/admission_allocations_test.go new file mode 100644 index 00000000..1600604a --- /dev/null +++ b/lib/instances/admission_allocations_test.go @@ -0,0 +1,41 @@ +package instances + +import ( + "context" + "testing" + + "github.com/kernel/hypeman/lib/resources" + "github.com/stretchr/testify/require" +) + +func TestRollbackAdmissionAllocationActiveClearsVisibleAllocation(t *testing.T) { + t.Parallel() + + m := &manager{ + admissionAllocations: make(map[string]resources.InstanceAllocation), + admissionAllocationsLoaded: true, + } + pid := 1234 + stored := &StoredMetadata{ + Id: "inst-1", + Name: "test-instance", + Vcpus: 2, + Size: 1024, + HypervisorPID: &pid, + } + + m.setAdmissionAllocationActive(stored, true) + + allocs, err := m.ListInstanceAllocations(context.Background()) + require.NoError(t, err) + require.Len(t, allocs, 1) + require.Equal(t, string(StateCreated), allocs[0].State) + + m.rollbackAdmissionAllocationActive(stored) + + allocs, err = m.ListInstanceAllocations(context.Background()) + require.NoError(t, err) + require.Len(t, allocs, 1) + require.Nil(t, stored.HypervisorPID) + require.Equal(t, string(StateStopped), allocs[0].State) +} diff --git a/lib/instances/restore.go b/lib/instances/restore.go index 18023218..c4658ba7 100644 --- a/lib/instances/restore.go +++ b/lib/instances/restore.go @@ -284,6 +284,7 @@ func (m *manager) restoreInstance( reconfigureSpanEnd(err) log.ErrorContext(ctx, "failed to configure guest network after restore", "instance_id", id, "error", err) _ = hv.Shutdown(ctx) + m.rollbackAdmissionAllocationActive(stored) releaseNetwork() return nil, fmt.Errorf("configure guest network after restore: %w", err) }