Skip to content

Commit d5a8b94

Browse files
hiroTamadaclaude
andcommitted
feat: replace polling with channel-based notifications and default to erofs
Replace the 500ms polling loop in waitForImageReady() with a channel-based pub/sub notification system on the image manager, reducing build-to-SSE lag. Switch the default image format from ext4 to erofs (LZ4-compressed read-only filesystem) for faster, smaller rootfs images. The VM init mounts erofs first with an ext4 fallback for backward compatibility with legacy images. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent eb40457 commit d5a8b94

8 files changed

Lines changed: 205 additions & 47 deletions

File tree

lib/builds/manager.go

Lines changed: 5 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -948,42 +948,20 @@ func (m *manager) updateBuildComplete(id string, status string, digest *string,
948948
m.notifyStatusChange(id, status)
949949
}
950950

951-
// waitForImageReady polls the image manager until the build's image is ready.
951+
// waitForImageReady blocks until the build's image reaches a terminal state.
952952
// imageRef should be the short repo name (e.g., "builds/abc123" or "myapp")
953953
// matching what triggerConversion stores in the image manager.
954954
// This ensures that when a build reports "ready", the image is actually usable
955955
// for instance creation (fixes KERNEL-863 race condition).
956956
func (m *manager) waitForImageReady(ctx context.Context, imageRef string) error {
957-
// Poll for up to 60 seconds (image conversion is typically fast)
958-
const maxAttempts = 120
959-
const pollInterval = 500 * time.Millisecond
960-
961957
m.logger.Debug("waiting for image to be ready", "image_ref", imageRef)
962958

963-
for attempt := 0; attempt < maxAttempts; attempt++ {
964-
select {
965-
case <-ctx.Done():
966-
return ctx.Err()
967-
default:
968-
}
969-
970-
img, err := m.imageManager.GetImage(ctx, imageRef)
971-
if err == nil {
972-
switch img.Status {
973-
case images.StatusReady:
974-
m.logger.Debug("image is ready", "image_ref", imageRef, "attempts", attempt+1)
975-
return nil
976-
case images.StatusFailed:
977-
return fmt.Errorf("image conversion failed")
978-
case images.StatusPending, images.StatusPulling, images.StatusConverting:
979-
// Still processing, continue polling
980-
}
981-
}
982-
// Image not found or still processing, wait and retry
983-
time.Sleep(pollInterval)
959+
if err := m.imageManager.WaitForReady(ctx, imageRef); err != nil {
960+
return err
984961
}
985962

986-
return fmt.Errorf("timeout waiting for image to be ready after %v", time.Duration(maxAttempts)*pollInterval)
963+
m.logger.Debug("image is ready", "image_ref", imageRef)
964+
return nil
987965
}
988966

989967
// subscribeToStatus adds a subscriber channel for status updates on a build

lib/builds/manager_test.go

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@ package builds
33
import (
44
"context"
55
"encoding/json"
6+
"fmt"
67
"io"
78
"log/slog"
89
"os"
910
"path/filepath"
11+
"sync"
1012
"testing"
1113
"time"
1214

@@ -237,6 +239,7 @@ func (m *mockSecretProvider) GetSecrets(ctx context.Context, secretIDs []string)
237239

238240
// mockImageManager implements images.Manager for testing
239241
type mockImageManager struct {
242+
mu sync.RWMutex
240243
images map[string]*images.Image
241244
getImageErr error
242245
}
@@ -274,11 +277,15 @@ func (m *mockImageManager) ImportLocalImage(ctx context.Context, repo, reference
274277
}
275278

276279
func (m *mockImageManager) GetImage(ctx context.Context, name string) (*images.Image, error) {
280+
m.mu.RLock()
281+
defer m.mu.RUnlock()
277282
if m.getImageErr != nil {
278283
return nil, m.getImageErr
279284
}
280285
if img, ok := m.images[name]; ok {
281-
return img, nil
286+
// Return a copy to avoid races on the Status field
287+
imgCopy := *img
288+
return &imgCopy, nil
282289
}
283290
return nil, images.ErrNotFound
284291
}
@@ -298,14 +305,49 @@ func (m *mockImageManager) TotalOCICacheBytes(ctx context.Context) (int64, error
298305
return 0, nil
299306
}
300307

308+
func (m *mockImageManager) WaitForReady(ctx context.Context, name string) error {
309+
for {
310+
select {
311+
case <-ctx.Done():
312+
return ctx.Err()
313+
default:
314+
}
315+
m.mu.RLock()
316+
img, ok := m.images[name]
317+
var status string
318+
if ok {
319+
status = img.Status
320+
}
321+
m.mu.RUnlock()
322+
switch status {
323+
case images.StatusReady:
324+
return nil
325+
case images.StatusFailed:
326+
return fmt.Errorf("image conversion failed")
327+
}
328+
time.Sleep(50 * time.Millisecond)
329+
}
330+
}
331+
301332
// SetImageReady sets an image to ready status for testing
302333
func (m *mockImageManager) SetImageReady(name string) {
334+
m.mu.Lock()
335+
defer m.mu.Unlock()
303336
m.images[name] = &images.Image{
304337
Name: name,
305338
Status: images.StatusReady,
306339
}
307340
}
308341

342+
// SetImageStatus sets an image's status in a thread-safe way for testing
343+
func (m *mockImageManager) SetImageStatus(name, status string) {
344+
m.mu.Lock()
345+
defer m.mu.Unlock()
346+
if img, ok := m.images[name]; ok {
347+
img.Status = status
348+
}
349+
}
350+
309351
// Test helper to create a manager with test paths and mocks
310352
func setupTestManager(t *testing.T) (*manager, *mockInstanceManager, *mockVolumeManager, string) {
311353
mgr, instanceMgr, volumeMgr, _, tempDir := setupTestManagerWithImageMgr(t)

lib/builds/race_test.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,17 +97,19 @@ func TestWaitForImageReady_WaitsForConversion(t *testing.T) {
9797
imageRef := "builds/" + buildID
9898

9999
// Start with image in pending status
100+
imageMgr.mu.Lock()
100101
imageMgr.images[imageRef] = &images.Image{
101102
Name: imageRef,
102103
Status: images.StatusPending,
103104
}
105+
imageMgr.mu.Unlock()
104106

105107
// Simulate conversion completing after a short delay
106108
go func() {
107109
time.Sleep(100 * time.Millisecond)
108-
imageMgr.images[imageRef].Status = images.StatusConverting
110+
imageMgr.SetImageStatus(imageRef, images.StatusConverting)
109111
time.Sleep(100 * time.Millisecond)
110-
imageMgr.images[imageRef].Status = images.StatusReady
112+
imageMgr.SetImageStatus(imageRef, images.StatusReady)
111113
}()
112114

113115
// waitForImageReady should poll and eventually succeed
@@ -131,10 +133,12 @@ func TestWaitForImageReady_ContextCancelled(t *testing.T) {
131133
imageRef := "builds/" + buildID
132134

133135
// Image stays in pending status forever
136+
imageMgr.mu.Lock()
134137
imageMgr.images[imageRef] = &images.Image{
135138
Name: imageRef,
136139
Status: images.StatusPending,
137140
}
141+
imageMgr.mu.Unlock()
138142

139143
// waitForImageReady should return context error
140144
err := mgr.waitForImageReady(ctx, imageRef)
@@ -152,10 +156,12 @@ func TestWaitForImageReady_Failed(t *testing.T) {
152156
imageRef := "builds/" + buildID
153157

154158
// Image is in failed status
159+
imageMgr.mu.Lock()
155160
imageMgr.images[imageRef] = &images.Image{
156161
Name: imageRef,
157162
Status: images.StatusFailed,
158163
}
164+
imageMgr.mu.Unlock()
159165

160166
// waitForImageReady should return error immediately
161167
err := mgr.waitForImageReady(ctx, imageRef)

lib/images/disk.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,12 @@ type ExportFormat string
1414

1515
const (
1616
FormatExt4 ExportFormat = "ext4" // Read-only ext4 (app images, default)
17-
FormatErofs ExportFormat = "erofs" // Read-only compressed (future: when kernel supports it)
17+
FormatErofs ExportFormat = "erofs" // Read-only compressed with LZ4 (default for app images)
1818
FormatCpio ExportFormat = "cpio" // Uncompressed archive (initrd, fast boot)
1919
)
2020

2121
// DefaultImageFormat is the default export format for OCI images
22-
const DefaultImageFormat = FormatExt4
22+
const DefaultImageFormat = FormatErofs
2323

2424
// ExportRootfs exports rootfs directory in specified format (public for system manager)
2525
func ExportRootfs(rootfsDir, outputPath string, format ExportFormat) (int64, error) {

lib/images/manager.go

Lines changed: 138 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,12 @@ const (
2323
StatusFailed = "failed"
2424
)
2525

26+
// StatusEvent represents a terminal status change for image readiness notifications.
27+
type StatusEvent struct {
28+
Status string
29+
Err error
30+
}
31+
2632
type Manager interface {
2733
ListImages(ctx context.Context) ([]Image, error)
2834
CreateImage(ctx context.Context, req CreateImageRequest) (*Image, error)
@@ -38,14 +44,19 @@ type Manager interface {
3844
// TotalOCICacheBytes returns the total size of the OCI layer cache.
3945
// Used by the resource manager for disk capacity tracking.
4046
TotalOCICacheBytes(ctx context.Context) (int64, error)
47+
// WaitForReady blocks until the image identified by name reaches a terminal
48+
// state (ready or failed) or the context is cancelled.
49+
WaitForReady(ctx context.Context, name string) error
4150
}
4251

4352
type manager struct {
44-
paths *paths.Paths
45-
ociClient *ociClient
46-
queue *BuildQueue
47-
createMu sync.Mutex
48-
metrics *Metrics
53+
paths *paths.Paths
54+
ociClient *ociClient
55+
queue *BuildQueue
56+
createMu sync.Mutex
57+
metrics *Metrics
58+
readySubscribers map[string][]chan StatusEvent // keyed by digestHex
59+
subscriberMu sync.RWMutex
4960
}
5061

5162
// NewManager creates a new image manager.
@@ -59,9 +70,10 @@ func NewManager(p *paths.Paths, maxConcurrentBuilds int, meter metric.Meter) (Ma
5970
}
6071

6172
m := &manager{
62-
paths: p,
63-
ociClient: ociClient,
64-
queue: NewBuildQueue(maxConcurrentBuilds),
73+
paths: p,
74+
ociClient: ociClient,
75+
queue: NewBuildQueue(maxConcurrentBuilds),
76+
readySubscribers: make(map[string][]chan StatusEvent),
6577
}
6678

6779
// Initialize metrics if meter is provided
@@ -254,7 +266,7 @@ func (m *manager) buildImage(ctx context.Context, ref *ResolvedRef) {
254266
m.updateStatusByDigest(ref, StatusConverting, nil)
255267

256268
diskPath := digestPath(m.paths, ref.Repository(), ref.DigestHex())
257-
// Use default image format (ext4 for now, easy to switch to erofs later)
269+
// Use default image format (erofs: read-only compressed with LZ4)
258270
diskSize, err := ExportRootfs(tempDir, diskPath, DefaultImageFormat)
259271
if err != nil {
260272
m.updateStatusByDigest(ref, StatusFailed, fmt.Errorf("convert to %s: %w", DefaultImageFormat, err))
@@ -286,6 +298,9 @@ func (m *manager) buildImage(ctx context.Context, ref *ResolvedRef) {
286298
return
287299
}
288300

301+
// Notify subscribers that image is ready
302+
m.notifyReady(ref.DigestHex(), StatusReady, nil)
303+
289304
// Only create/update tag symlink on successful completion
290305
if ref.Tag() != "" {
291306
if err := createTagSymlink(m.paths, ref.Repository(), ref.Tag(), ref.DigestHex()); err != nil {
@@ -317,6 +332,11 @@ func (m *manager) updateStatusByDigest(ref *ResolvedRef, status string, err erro
317332
}
318333

319334
writeMetadata(m.paths, ref.Repository(), ref.DigestHex(), meta)
335+
336+
// Notify subscribers of terminal status
337+
if status == StatusReady || status == StatusFailed {
338+
m.notifyReady(ref.DigestHex(), status, err)
339+
}
320340
}
321341

322342
func (m *manager) RecoverInterruptedBuilds() {
@@ -476,3 +496,112 @@ func (m *manager) TotalOCICacheBytes(ctx context.Context) (int64, error) {
476496
}
477497
return total, nil
478498
}
499+
500+
// WaitForReady blocks until the image reaches a terminal state (ready or failed)
501+
// or the context is cancelled.
502+
//
503+
// The image may not exist yet when this is called (e.g., the registry's
504+
// triggerConversion goroutine hasn't called ImportLocalImage yet), so we
505+
// poll briefly for the image to appear before subscribing for notifications.
506+
func (m *manager) WaitForReady(ctx context.Context, name string) error {
507+
// Wait for the image to appear in the store. In the build flow, the
508+
// registry triggers ImportLocalImage asynchronously after a push, so the
509+
// image may not exist when the build manager calls WaitForReady.
510+
const maxWaitForExist = 30 * time.Second
511+
const pollInterval = 100 * time.Millisecond
512+
513+
var img *Image
514+
deadline := time.Now().Add(maxWaitForExist)
515+
for {
516+
got, err := m.GetImage(ctx, name)
517+
if err == nil {
518+
img = got
519+
break
520+
}
521+
if time.Now().After(deadline) {
522+
return fmt.Errorf("get image: %w", err)
523+
}
524+
select {
525+
case <-ctx.Done():
526+
return ctx.Err()
527+
case <-time.After(pollInterval):
528+
}
529+
}
530+
531+
// Check if already in terminal state
532+
switch img.Status {
533+
case StatusReady:
534+
return nil
535+
case StatusFailed:
536+
return fmt.Errorf("image conversion failed")
537+
}
538+
539+
digestHex := strings.TrimPrefix(img.Digest, "sha256:")
540+
541+
// Subscribe BEFORE re-checking to avoid TOCTOU race
542+
ch := make(chan StatusEvent, 1)
543+
m.subscribeToReady(digestHex, ch)
544+
defer m.unsubscribeFromReady(digestHex, ch)
545+
546+
// Re-check after subscribing to close the race window
547+
img, err := m.GetImage(ctx, name)
548+
if err == nil {
549+
switch img.Status {
550+
case StatusReady:
551+
return nil
552+
case StatusFailed:
553+
return fmt.Errorf("image conversion failed")
554+
}
555+
}
556+
557+
// Wait for notification or context cancellation
558+
select {
559+
case event := <-ch:
560+
if event.Status == StatusReady {
561+
return nil
562+
}
563+
return fmt.Errorf("image conversion failed")
564+
case <-ctx.Done():
565+
return ctx.Err()
566+
}
567+
}
568+
569+
// subscribeToReady registers a channel for terminal status notifications on a digest.
570+
func (m *manager) subscribeToReady(digestHex string, ch chan StatusEvent) {
571+
m.subscriberMu.Lock()
572+
defer m.subscriberMu.Unlock()
573+
m.readySubscribers[digestHex] = append(m.readySubscribers[digestHex], ch)
574+
}
575+
576+
// unsubscribeFromReady removes a subscriber channel.
577+
func (m *manager) unsubscribeFromReady(digestHex string, ch chan StatusEvent) {
578+
m.subscriberMu.Lock()
579+
defer m.subscriberMu.Unlock()
580+
581+
subscribers := m.readySubscribers[digestHex]
582+
for i, sub := range subscribers {
583+
if sub == ch {
584+
m.readySubscribers[digestHex] = append(subscribers[:i], subscribers[i+1:]...)
585+
break
586+
}
587+
}
588+
589+
if len(m.readySubscribers[digestHex]) == 0 {
590+
delete(m.readySubscribers, digestHex)
591+
}
592+
}
593+
594+
// notifyReady broadcasts a terminal status event to all subscribers for a digest.
595+
func (m *manager) notifyReady(digestHex string, status string, err error) {
596+
m.subscriberMu.RLock()
597+
defer m.subscriberMu.RUnlock()
598+
599+
event := StatusEvent{Status: status, Err: err}
600+
for _, ch := range m.readySubscribers[digestHex] {
601+
// Non-blocking send — drop if channel is full
602+
select {
603+
case ch <- event:
604+
default:
605+
}
606+
}
607+
}

0 commit comments

Comments
 (0)