Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 62 additions & 7 deletions core/services/workflows/syncer/v2/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,22 @@ type ORM interface {
// creation since they don't support async initialization hooks.
type engineFactoryFn func(ctx context.Context, wfid string, owner string, name types.WorkflowName, tag string, config []byte, binary []byte, initDone chan<- error) (services.Service, error)

// DrainableService extends services.Service with non-blocking drain semantics.
// An engine that implements this interface can be drained (stop accepting new work)
// without blocking the caller, then closed once all in-flight work completes.
type DrainableService interface {
services.Service
// Drain signals the service to stop accepting new work. Non-blocking and idempotent.
Drain()
// ActiveExecutions returns the count of in-flight executions.
ActiveExecutions() int32
// IsDraining returns true if Drain() has been called.
IsDraining() bool
}

// ErrDrainInProgress indicates the engine is draining and deletion will be retried.
var ErrDrainInProgress = errors.New("drain in progress")

// eventHandler is a handler for WorkflowRegistryEvent events. Each event type has a corresponding method that handles the event.
type eventHandler struct {
services.Service
Expand Down Expand Up @@ -363,8 +379,12 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error {
}
}()

if err := h.workflowPausedEvent(ctx, payload); err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow paused event: %v", err), h.lggr)
if err = h.workflowPausedEvent(ctx, payload); err != nil {
if errors.Is(err, ErrDrainInProgress) {
logCustMsg(ctx, cma, fmt.Sprintf("workflow pause deferred: %v", err), h.lggr)
} else {
logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow paused event: %v", err), h.lggr)
}
return err
}

Expand Down Expand Up @@ -417,8 +437,12 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error {
}
}()

if herr := h.workflowDeletedEvent(ctx, payload); herr != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow deleted event: %v", herr), h.lggr)
if herr = h.workflowDeletedEvent(ctx, payload); herr != nil {
if errors.Is(herr, ErrDrainInProgress) {
logCustMsg(ctx, cma, fmt.Sprintf("workflow deletion deferred: %v", herr), h.lggr)
} else {
logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow deleted event: %v", herr), h.lggr)
}
return herr
}

Expand Down Expand Up @@ -497,8 +521,15 @@ func (h *eventHandler) workflowRegisteredEvent(
// We know we need an engine, let's make sure that there isn't already one running for this workflow ID.
prevEngine, ok := h.engineRegistry.Get(payload.WorkflowID)
if ok && prevEngine.Ready() == nil && spec.Status == job.WorkflowSpecStatusActive {
// This is the happy-path, we're done.
return nil
// Check if the engine is draining (pending deletion) — if so, it needs replacement.
drainable, isDrainable := prevEngine.Service.(DrainableService)
if !isDrainable || !drainable.IsDraining() {
// This is the happy-path, we're done.
return nil
}
h.lggr.Infow("Engine is draining, will replace with new engine",
"workflowID", payload.WorkflowID.Hex())
// Fall through to cleanup + recreate below
}

// Any other case ->
Expand Down Expand Up @@ -736,7 +767,31 @@ func (h *eventHandler) workflowDeletedEvent(
// prior steps fail.
e, ok := h.engineRegistry.Get(payload.WorkflowID)
if ok {
if innerErr := e.Close(); innerErr != nil {
// Try drain-aware path if engine supports it
if drainable, isDrainable := e.Service.(DrainableService); isDrainable {
// Phase 1: Signal drain (non-blocking, idempotent)
if !drainable.IsDraining() {
drainable.Drain()
h.lggr.Infow("Initiated drain for workflow engine", "workflowID", payload.WorkflowID.Hex())
}

// Phase 2: Check if all executions have completed
active := drainable.ActiveExecutions()
if active > 0 {
h.lggr.Infow("Workflow deletion deferred: active executions still running",
"workflowID", payload.WorkflowID.Hex(),
"activeExecutions", active)
return fmt.Errorf("%w: %d active executions still running", ErrDrainInProgress, active)
}

// Phase 3: All executions done, Close() is now fast
h.lggr.Infow("Workflow engine drained, closing", "workflowID", payload.WorkflowID.Hex())
}

// Close the engine (fast if drained, or blocking for legacy engines).
// Ignore ErrAlreadyStopped — engine may have been closed on a previous
// attempt that succeeded at Close() but failed at artifact deletion.
if innerErr := e.Close(); innerErr != nil && !errors.Is(innerErr, services.ErrAlreadyStopped) {
return fmt.Errorf("failed to close workflow engine: %w", innerErr)
}
}
Expand Down
9 changes: 9 additions & 0 deletions core/services/workflows/syncer/v2/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ type metrics struct {
reconcileEventsDispatched metric.Int64Histogram // events dispatched per source per tick
reconcileDuration metric.Int64Histogram // wall-clock ms for parallel event processing
reconcileEventsBackoff metric.Int64Counter // events skipped due to backoff

// Drain metrics
drainingWorkflows metric.Int64Gauge // 1=draining, 0=not draining per workflowID
}

func (m *metrics) recordHandleDuration(ctx context.Context, d time.Duration, event string, success bool) {
Expand Down Expand Up @@ -136,6 +139,11 @@ func newMetrics() (*metrics, error) {
return nil, err
}

drainingWorkflows, err := beholder.GetMeter().Int64Gauge("platform_workflow_registry_syncer_draining_workflows")
if err != nil {
return nil, err
}

return &metrics{
handleDuration: handleDuration,
fetchedWorkflows: fetchedWorkflows,
Expand All @@ -148,5 +156,6 @@ func newMetrics() (*metrics, error) {
reconcileEventsDispatched: reconcileEventsDispatched,
reconcileDuration: reconcileDuration,
reconcileEventsBackoff: reconcileEventsBackoff,
drainingWorkflows: drainingWorkflows,
}, nil
}
39 changes: 39 additions & 0 deletions core/services/workflows/v2/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ type Engine struct {
tracer trace.Tracer

orgID string

// draining is set to true when a delete/pause event is pending.
// When true, no new executions will be started by handleAllTriggerEvents.
draining atomic.Bool

// activeExecutions tracks the number of currently in-flight workflow executions.
activeExecutions atomic.Int32
}

type triggerCapability struct {
Expand Down Expand Up @@ -212,6 +219,27 @@ func NewEngine(cfg *EngineConfig) (*Engine, error) {
return engine, nil
}

// Drain signals the engine to stop accepting new executions.
// Non-blocking and idempotent. In-flight executions continue to completion.
// Does NOT unregister triggers (that stays in close()) to avoid breaking
// ackTriggerEvent for in-flight executions.
// Sets a health condition so that Healthy() returns an error, allowing the
// registration handler to detect that this engine needs replacement.
func (e *Engine) Drain() {
e.draining.Store(true)
e.srvcEng.SetHealthCond("draining", errors.New("engine is draining, pending deletion"))
}

// IsDraining returns true if Drain() has been called.
func (e *Engine) IsDraining() bool {
return e.draining.Load()
}

// ActiveExecutions returns the count of currently in-flight workflow executions.
func (e *Engine) ActiveExecutions() int32 {
return e.activeExecutions.Load()
}

func (e *Engine) start(ctx context.Context) error {
e.cfg.Module.Start()
ctx = context.WithoutCancel(ctx)
Expand Down Expand Up @@ -589,6 +617,13 @@ func (e *Engine) handleAllTriggerEvents(ctx context.Context) {
if err != nil {
return
}

// Check if engine is draining — reject new executions
if e.draining.Load() {
e.logger().Infow("Engine is draining, skipping trigger event", "eventID", queueHead.event.Event.ID)
return // exit the loop entirely
}

eventAge := queueHead.timestamp.Sub(e.cfg.Clock.Now())
eventID := queueHead.event.Event.ID
e.logger().Debugw("Popped a trigger event from the queue", "eventID", eventID, "eventAgeMs", eventAge.Milliseconds())
Expand All @@ -606,9 +641,13 @@ func (e *Engine) handleAllTriggerEvents(ctx context.Context) {
e.logger().Errorw("Failed to acquire executions semaphore", "err", err)
continue
}

e.activeExecutions.Add(1) // increment BEFORE GoCtx to prevent TOCTOU race

e.logger().Debugw("Scheduling a trigger event for execution", "eventID", eventID)
e.srvcEng.GoCtx(context.WithoutCancel(ctx), func(ctx context.Context) {
defer free()
defer e.activeExecutions.Add(-1) // decrement when execution finishes
creCtx := contexts.CREValue(ctx)
// Tracer is no-op if DebugMode is false
ctx, span := e.tracer.Start(ctx, "workflow_execution",
Expand Down
Loading