diff --git a/cmd/obol/main.go b/cmd/obol/main.go index 72b8d4f6..59028a67 100644 --- a/cmd/obol/main.go +++ b/cmd/obol/main.go @@ -225,8 +225,15 @@ GLOBAL OPTIONS:{{template "visibleFlagTemplate" .}}{{end}} { Name: "down", Usage: "Stop the Obol Stack", + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "yes", + Aliases: []string{"y"}, + Usage: "Skip the live-services confirmation prompt (required in non-interactive shells when offers are running)", + }, + }, Action: func(ctx context.Context, cmd *cli.Command) error { - return stack.Down(cfg, getUI(cmd)) + return stack.Down(cfg, getUI(cmd), cmd.Bool("yes")) }, }, { @@ -238,9 +245,14 @@ GLOBAL OPTIONS:{{template "visibleFlagTemplate" .}}{{end}} Aliases: []string{"f"}, Usage: "Also delete persistent data", }, + &cli.BoolFlag{ + Name: "yes", + Aliases: []string{"y"}, + Usage: "Skip the live-services confirmation prompt (required in non-interactive shells when offers are running)", + }, }, Action: func(ctx context.Context, cmd *cli.Command) error { - return stack.Purge(cfg, getUI(cmd), cmd.Bool("force")) + return stack.Purge(cfg, getUI(cmd), cmd.Bool("force"), cmd.Bool("yes")) }, }, }, diff --git a/internal/stack/safety.go b/internal/stack/safety.go new file mode 100644 index 00000000..643c844d --- /dev/null +++ b/internal/stack/safety.go @@ -0,0 +1,283 @@ +package stack + +import ( + "encoding/json" + "errors" + "fmt" + "os" + "path/filepath" + "sort" + "strconv" + "strings" + "syscall" + + "github.com/ObolNetwork/obol-stack/internal/config" + "github.com/ObolNetwork/obol-stack/internal/kubectl" + "github.com/ObolNetwork/obol-stack/internal/ui" +) + +// LiveServiceOffer is the subset of a ServiceOffer relevant to the safety +// check: enough to render the prompt without pulling in monetizeapi. +type LiveServiceOffer struct { + Namespace string + Name string + Type string + Model string + Path string + Price string +} + +// LiveHostGateway is a sell-inference offer with a still-running host +// gateway process (PID file present and alive). These survive `obol stack +// down` of the cluster and need a separate signal. +type LiveHostGateway struct { + Name string + PID int +} + +// LiveServices captures the snapshot used by ConfirmRunningServicesLoss. +type LiveServices struct { + Offers []LiveServiceOffer + Gateways []LiveHostGateway +} + +// Empty returns true when no live cluster services and no live host +// gateways were found. The caller treats this as "safe to proceed without +// prompting". +func (s LiveServices) Empty() bool { return len(s.Offers) == 0 && len(s.Gateways) == 0 } + +// ConfirmRunningServicesLoss warns the operator when a destructive action +// (`obol stack down` / `obol stack purge`) is about to tear down a cluster +// that is currently serving traffic, and asks whether to proceed. +// +// Returns true when the caller should continue. The rules: +// +// - No live offers and no live gateways -> pass through (no prompt). +// - skipConfirm (operator passed --yes) -> pass through. +// - Interactive TTY -> render the list and ask `ui.Confirm` (default N). +// - Non-interactive without --yes -> return a non-nil error so the +// caller fails closed. This is the safety bar that prevents a stray +// `ssh host ''` from a non-prod worktree wiping the +// production stack without an explicit operator decision. +func ConfirmRunningServicesLoss(cfg *config.Config, u *ui.UI, action string, skipConfirm bool) (bool, error) { + services := DiscoverLiveServices(cfg) + if services.Empty() { + return true, nil + } + if skipConfirm { + u.Warnf("%s: %d live offer(s), %d host gateway(s) — proceeding because --yes was passed", + action, len(services.Offers), len(services.Gateways)) + return true, nil + } + + if !u.IsTTY() { + return false, fmt.Errorf("refusing to run %q non-interactively while %d live offer(s) and %d host gateway(s) are serving traffic: pass --yes to confirm", + action, len(services.Offers), len(services.Gateways)) + } + + u.Blank() + u.Warnf("The following services are currently live on this stack:") + u.Blank() + for _, o := range services.Offers { + ns := o.Namespace + if ns == "" { + ns = "default" + } + summary := o.Type + if o.Model != "" { + summary = fmt.Sprintf("%s, %s", summary, o.Model) + } + if o.Price != "" { + summary = fmt.Sprintf("%s, %s", summary, o.Price) + } + u.Printf(" • %s/%s", ns, o.Name) + if o.Path != "" { + u.Dim(" endpoint: " + o.Path) + } + u.Dim(" " + summary) + } + for _, g := range services.Gateways { + u.Printf(" • sell-inference gateway %s (pid %d) will be orphaned", g.Name, g.PID) + } + u.Blank() + u.Dim(fmt.Sprintf(" After `%s`, external buyers will see 402/530 errors until `obol stack up`.", action)) + u.Blank() + + return u.Confirm(fmt.Sprintf("Proceed with %s?", action), false), nil +} + +// DiscoverLiveServices snapshots cluster-side ServiceOffers in a +// payment-gate-ready state and host-side sell-inference gateways with +// alive PID files. Errors are intentionally swallowed: a discovery failure +// must not block `obol stack down`. A best-effort empty snapshot is +// returned so the caller falls through to "no prompt needed" only when we +// have positive evidence of nothing live. +func DiscoverLiveServices(cfg *config.Config) LiveServices { + return LiveServices{ + Offers: discoverLiveOffers(cfg), + Gateways: discoverLiveHostGateways(cfg), + } +} + +// discoverLiveOffers returns ServiceOffers whose PaymentGateReady AND +// RoutePublished conditions are True. We deliberately do not require +// `Ready=True` — the `Registered` condition stays False for unregistered +// offers indefinitely, and excluding those would let `obol stack down` +// silently nuke real production traffic (which is exactly the regression +// this gate exists to prevent). +func discoverLiveOffers(cfg *config.Config) []LiveServiceOffer { + if err := kubectl.EnsureCluster(cfg); err != nil { + // Cluster already unreachable: down/purge is a no-op for the + // in-cluster side. Caller will still see host gateways. + return nil + } + bin, kc := kubectl.Paths(cfg) + raw, err := kubectl.Output(bin, kc, "get", "serviceoffers.obol.org", "-A", "-o", "json") + if err != nil { + return nil + } + var list struct { + Items []rawOffer `json:"items"` + } + if err := json.Unmarshal([]byte(raw), &list); err != nil { + return nil + } + + out := make([]LiveServiceOffer, 0, len(list.Items)) + for _, item := range list.Items { + if !item.gateReady() { + continue + } + out = append(out, LiveServiceOffer{ + Namespace: item.Metadata.Namespace, + Name: item.Metadata.Name, + Type: item.Spec.Type, + Model: item.Spec.Model.Name, + Path: item.Spec.Path, + Price: item.priceSummary(), + }) + } + sort.Slice(out, func(i, j int) bool { + if out[i].Namespace == out[j].Namespace { + return out[i].Name < out[j].Name + } + return out[i].Namespace < out[j].Namespace + }) + return out +} + +// rawOffer mirrors the subset of monetizeapi.ServiceOffer used here. We +// avoid importing monetizeapi to keep internal/stack free of CRD typing. +type rawOffer struct { + Metadata struct { + Name string `json:"name"` + Namespace string `json:"namespace"` + } `json:"metadata"` + Spec struct { + Type string `json:"type"` + Path string `json:"path"` + Model struct { + Name string `json:"name"` + } `json:"model"` + Payment struct { + Price struct { + PerRequest string `json:"perRequest"` + PerMTok string `json:"perMTok"` + PerHour string `json:"perHour"` + } `json:"price"` + Asset struct { + Symbol string `json:"symbol"` + } `json:"asset"` + } `json:"payment"` + } `json:"spec"` + Status struct { + Conditions []struct { + Type string `json:"type"` + Status string `json:"status"` + } `json:"conditions"` + } `json:"status"` +} + +func (o rawOffer) gateReady() bool { + var paymentGate, routePublished bool + for _, c := range o.Status.Conditions { + if c.Status != "True" { + continue + } + switch c.Type { + case "PaymentGateReady": + paymentGate = true + case "RoutePublished": + routePublished = true + } + } + return paymentGate && routePublished +} + +func (o rawOffer) priceSummary() string { + asset := o.Spec.Payment.Asset.Symbol + if asset == "" { + asset = "USDC" + } + switch { + case o.Spec.Payment.Price.PerRequest != "": + return fmt.Sprintf("%s %s/request", o.Spec.Payment.Price.PerRequest, asset) + case o.Spec.Payment.Price.PerMTok != "": + return fmt.Sprintf("%s %s/MTok", o.Spec.Payment.Price.PerMTok, asset) + case o.Spec.Payment.Price.PerHour != "": + return fmt.Sprintf("%s %s/hour", o.Spec.Payment.Price.PerHour, asset) + } + return "" +} + +// discoverLiveHostGateways walks /sell-inference//gateway.pid +// for each persisted sell-inference deployment and reports those with a +// PID file pointing at an alive process. Mirrors the readGatewayPID + +// processAlive logic in cmd/obol/sell.go to avoid an import-from-cmd +// cycle. +func discoverLiveHostGateways(cfg *config.Config) []LiveHostGateway { + root := filepath.Join(cfg.StateDir, "sell-inference") + entries, err := os.ReadDir(root) + if err != nil { + return nil + } + out := make([]LiveHostGateway, 0, len(entries)) + for _, e := range entries { + if !e.IsDir() { + continue + } + pidPath := filepath.Join(root, e.Name(), "gateway.pid") + data, err := os.ReadFile(pidPath) + if err != nil { + continue + } + pid, err := strconv.Atoi(strings.TrimSpace(string(data))) + if err != nil || pid <= 0 { + continue + } + if !pidAlive(pid) { + continue + } + out = append(out, LiveHostGateway{Name: e.Name(), PID: pid}) + } + sort.Slice(out, func(i, j int) bool { return out[i].Name < out[j].Name }) + return out +} + +func pidAlive(pid int) bool { + p, err := os.FindProcess(pid) + if err != nil { + return false + } + // Signal 0 doesn't deliver — just probes existence/permission on Unix. + return p.Signal(syscall.Signal(0)) == nil +} + +// errSafetyAborted lets cmd/obol/stack.go distinguish "user said no" from +// other errors. The CLI maps this to exit code 0 with a brief message +// instead of a noisy error trace. +var errSafetyAborted = errors.New("aborted by operator at safety prompt") + +// ErrSafetyAborted is exported for callers that want to detect the abort +// path with errors.Is. +func ErrSafetyAborted() error { return errSafetyAborted } diff --git a/internal/stack/safety_test.go b/internal/stack/safety_test.go new file mode 100644 index 00000000..e0fa3eb3 --- /dev/null +++ b/internal/stack/safety_test.go @@ -0,0 +1,174 @@ +package stack + +import ( + "bytes" + "errors" + "os" + "path/filepath" + "strconv" + "strings" + "testing" + + "github.com/ObolNetwork/obol-stack/internal/config" + "github.com/ObolNetwork/obol-stack/internal/ui" +) + +// newTestCfg returns a Config rooted at t.TempDir() so safety_test does not +// touch real cluster paths. ConfigDir has no kubeconfig.yaml so the cluster +// half of DiscoverLiveServices short-circuits via kubectl.EnsureCluster. +func newTestCfg(t *testing.T) *config.Config { + t.Helper() + tmp := t.TempDir() + return &config.Config{ + ConfigDir: filepath.Join(tmp, "config"), + DataDir: filepath.Join(tmp, "data"), + BinDir: filepath.Join(tmp, "bin"), + StateDir: filepath.Join(tmp, "state"), + } +} + +func writeGatewayPID(t *testing.T, cfg *config.Config, name string, pid int) { + t.Helper() + dir := filepath.Join(cfg.StateDir, "sell-inference", name) + if err := os.MkdirAll(dir, 0o755); err != nil { + t.Fatalf("mkdir: %v", err) + } + if err := os.WriteFile(filepath.Join(dir, "gateway.pid"), []byte(strconv.Itoa(pid)), 0o644); err != nil { + t.Fatalf("write pid: %v", err) + } +} + +func TestConfirmRunningServicesLoss_EmptyPassesThrough(t *testing.T) { + cfg := newTestCfg(t) + var buf bytes.Buffer + u := ui.NewForTest(&buf, &buf) + + proceed, err := ConfirmRunningServicesLoss(cfg, u, "obol stack down", false) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if !proceed { + t.Fatal("empty snapshot must pass through (returned false)") + } + if buf.Len() != 0 { + t.Errorf("expected silent pass-through, got output: %q", buf.String()) + } +} + +func TestConfirmRunningServicesLoss_NonInteractiveLiveServiceFailsClosed(t *testing.T) { + cfg := newTestCfg(t) + writeGatewayPID(t, cfg, "aeon", os.Getpid()) // our own PID is guaranteed alive + + var buf bytes.Buffer + u := ui.NewForTest(&buf, &buf) // isTTY defaults false + + proceed, err := ConfirmRunningServicesLoss(cfg, u, "obol stack down", false) + if err == nil { + t.Fatal("expected non-nil error when live gateways exist in non-interactive shell without --yes") + } + if !strings.Contains(err.Error(), "--yes") { + t.Errorf("error must mention --yes (operator escape hatch): %v", err) + } + if proceed { + t.Error("must not proceed when the safety gate trips") + } +} + +func TestConfirmRunningServicesLoss_SkipConfirmPassesThrough(t *testing.T) { + cfg := newTestCfg(t) + writeGatewayPID(t, cfg, "aeon", os.Getpid()) + + var buf bytes.Buffer + u := ui.NewForTest(&buf, &buf) + + proceed, err := ConfirmRunningServicesLoss(cfg, u, "obol stack down", true) + if err != nil { + t.Fatalf("unexpected err with --yes: %v", err) + } + if !proceed { + t.Fatal("--yes must let the action proceed") + } + if !strings.Contains(buf.String(), "--yes") { + t.Errorf("operator should still see a warning, got: %q", buf.String()) + } +} + +func TestConfirmRunningServicesLoss_DeadGatewayDoesNotTrigger(t *testing.T) { + cfg := newTestCfg(t) + // PID 0 is treated as invalid; reserve a fresh PID by forking would be + // flaky in unit tests, so use a definitely-not-existing PID. + writeGatewayPID(t, cfg, "stale", 2147483645) + + var buf bytes.Buffer + u := ui.NewForTest(&buf, &buf) + + proceed, err := ConfirmRunningServicesLoss(cfg, u, "obol stack down", false) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if !proceed { + t.Fatal("dead PID file must be ignored — should pass through") + } +} + +func TestErrSafetyAborted_IsExported(t *testing.T) { + if !errors.Is(ErrSafetyAborted(), errSafetyAborted) { + t.Fatal("ErrSafetyAborted() must wrap the package sentinel") + } +} + +func TestRawOffer_GateReadyRequiresBothConditions(t *testing.T) { + cases := []struct { + name string + conds [][2]string // (type, status) + wantGate bool + }{ + {"both true", [][2]string{{"PaymentGateReady", "True"}, {"RoutePublished", "True"}}, true}, + {"payment only", [][2]string{{"PaymentGateReady", "True"}}, false}, + {"route only", [][2]string{{"RoutePublished", "True"}}, false}, + {"both false", [][2]string{{"PaymentGateReady", "False"}, {"RoutePublished", "False"}}, false}, + {"payment true route false", [][2]string{{"PaymentGateReady", "True"}, {"RoutePublished", "False"}}, false}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + var o rawOffer + for _, c := range tc.conds { + o.Status.Conditions = append(o.Status.Conditions, struct { + Type string `json:"type"` + Status string `json:"status"` + }{Type: c[0], Status: c[1]}) + } + if got := o.gateReady(); got != tc.wantGate { + t.Errorf("gateReady() = %v, want %v", got, tc.wantGate) + } + }) + } +} + +func TestRawOffer_PriceSummary(t *testing.T) { + mk := func(perRequest, perMTok, perHour, asset string) rawOffer { + var o rawOffer + o.Spec.Payment.Price.PerRequest = perRequest + o.Spec.Payment.Price.PerMTok = perMTok + o.Spec.Payment.Price.PerHour = perHour + o.Spec.Payment.Asset.Symbol = asset + return o + } + cases := []struct { + name string + in rawOffer + want string + }{ + {"per request USDC default", mk("0.001", "", "", ""), "0.001 USDC/request"}, + {"per MTok OBOL", mk("", "23", "", "OBOL"), "23 OBOL/MTok"}, + {"per hour", mk("", "", "5", "USDC"), "5 USDC/hour"}, + {"empty", mk("", "", "", ""), ""}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + if got := tc.in.priceSummary(); got != tc.want { + t.Errorf("priceSummary() = %q, want %q", got, tc.want) + } + }) + } +} diff --git a/internal/stack/stack.go b/internal/stack/stack.go index 0f4b111f..0244752a 100644 --- a/internal/stack/stack.go +++ b/internal/stack/stack.go @@ -255,7 +255,7 @@ func Up(cfg *config.Config, u *ui.UI, wildcardDNS bool) error { } // Down stops the cluster and the DNS resolver container. -func Down(cfg *config.Config, u *ui.UI) error { +func Down(cfg *config.Config, u *ui.UI, skipConfirm bool) error { stackID := getStackID(cfg) if stackID == "" { return errors.New("stack ID not found, stack may not be initialized") @@ -266,6 +266,20 @@ func Down(cfg *config.Config, u *ui.UI) error { return fmt.Errorf("failed to load backend: %w", err) } + // Refuse to tear down a stack that is currently serving paid traffic + // without explicit operator confirmation. Failing closed here is the + // safety bar that prevents a stray non-interactive invocation (the + // 2026-05-22 inference.v1337.org outage was an `ssh host ''` + // against the wrong stack ID). + proceed, err := ConfirmRunningServicesLoss(cfg, u, "obol stack down", skipConfirm) + if err != nil { + return err + } + if !proceed { + u.Info("Aborted.") + return nil + } + // Cluster delete invalidates any active quick tunnel URL. Warn first so // users with sellers registered against the URL aren't blindsided. currentURL, _ := tunnel.GetTunnelURL(cfg) @@ -282,7 +296,21 @@ func Down(cfg *config.Config, u *ui.UI) error { } // Purge deletes the cluster config and optionally data -func Purge(cfg *config.Config, u *ui.UI, force bool) error { +func Purge(cfg *config.Config, u *ui.UI, force, skipConfirm bool) error { + // Refuse to purge a stack that is currently serving paid traffic + // without explicit operator confirmation. --force keeps its existing + // meaning ("delete data dir even when root-owned") and does NOT imply + // --yes — operator muscle memory for `obol stack purge --force` still + // hits this gate. + proceed, err := ConfirmRunningServicesLoss(cfg, u, "obol stack purge", skipConfirm) + if err != nil { + return err + } + if !proceed { + u.Info("Aborted.") + return nil + } + // When --force is set, data dir will be deleted — offer wallet backup. if force { openclaw.PromptBackupBeforePurge(cfg, u) @@ -422,7 +450,9 @@ func syncDefaults(cfg *config.Config, u *ui.UI, kubeconfigPath string, dataDir s } } - if downErr := Down(cfg, u); downErr != nil { + // Internal cleanup of a half-deployed stack — skip the safety + // prompt; the operator did not invoke `obol stack down` here. + if downErr := Down(cfg, u, true); downErr != nil { u.Warnf("Failed to stop cluster during cleanup: %v", downErr) }