EvenScheduler: opt-in round-robin rebalance onto returning idle supervisors#8778
EvenScheduler: opt-in round-robin rebalance onto returning idle supervisors#8778mwkang wants to merge 2 commits into
Conversation
EvenScheduler/DefaultScheduler do not move workers onto a supervisor
that returns to service after maintenance: the topology already has
its desired worker count spread across the surviving supervisors, so
the returned supervisor sits at used=0 until an operator rebalances by
hand. Add an opt-in, binary-trigger pass that relocates workers onto
such idle supervisors, round-robin across topologies, in a single
scheduling round. The feature is disabled by default, so existing
clusters see no behavior change.
needsScheduling is deliberately left untouched. The new trigger lives
in Cluster.hasIdleSupervisorReusableBy and is reached only from
EvenScheduler.redistributeOntoIdleSupervisors, which runs at the top of
scheduleTopologiesEvenly and DefaultScheduler.defaultSchedule.
ResourceAwareScheduler (needsSchedulingRas) and the multitenant pools
keep their existing needsScheduling behavior and never enter the new
path, so the feature is scoped to EvenScheduler/DefaultScheduler (and
the leftover topologies IsolationScheduler delegates to them) only.
The trigger is binary -- it fires only when at least one stable,
non-blacklisted supervisor has zero used slots and the topology is not
already on it -- so an "almost balanced" cluster never moves. Each
topology contributes at most one worker per round-robin iteration, so
the returned supervisor ends up hosting workers from several topologies
(preserving the per-supervisor workload diversity a fresh submission
has) instead of letting the first scheduled topology grab the whole
idle capacity. Per-topology relocations in one round are capped at
floor(numWorkers / nonBlacklistedSupervisorCount) * idleSupervisorCount,
tightened further by max.free.per.topology when positive. Workers are
pulled from the supervisor where the topology has the most workers
(ties broken by supervisor id, lexicographically), never draining one
below a single worker, and each pulled worker is placed directly onto
an idle slot so the regular sortSlots/interleave pass cannot drop it
back into the just-vacated slot.
- DaemonConfig / conf/defaults.yaml (dot-only keys):
nimbus.even.rebalance.idle.supervisor.enabled (false)
nimbus.even.rebalance.max.free.per.topology (0 = unbounded)
nimbus.even.rebalance.idle.supervisor.min.stable.rounds (3)
- Cluster: new hasIdleSupervisorReusableBy (trigger) plus
isIdleSupervisorAvailableForEvenRebalance and
hasMinimumIdleSupervisorStability (eligibility + uptime guard,
uptime >= min.stable.rounds * supervisor.monitor.frequency.secs)
that skips a just-returned, possibly-flapping supervisor. All gated
by the enabled flag; needsScheduling itself is unchanged.
- SupervisorDetails.uptimeSecs surfaced from SupervisorInfo so the
uptime guard can be evaluated; legacy constructors default it to
Long.MAX_VALUE (always stable) to leave existing callers unchanged.
- EvenScheduler.redistributeOntoIdleSupervisors returns immediately
when the feature is disabled, so a default (disabled) cluster does
no per-scheduling-round supervisor scanning.
- Add TestEvenSchedulerIdleSupervisor covering the trigger, the
per-topology drain cap, single-worker no-op, one-round even
distribution, round-robin sharing across topologies, the uptime
flap guard, deterministic donor tie-break, blacklist handling, the
DefaultScheduler leftover-subset path, and the IsolationScheduler
interaction (idle non-isolated target only; a reserved host stays
out even when its isolated topology is down).
|
Disclaimer: ran this through an LLM for thoroughness Most of the following items are nice-to-haves, but can you check numbers 4 and 5? 1. 🟡 Medium — Missing comment on the null-guard in for (TopologyDetails topology : cluster.needsSchedulingTopologies()) {
if (topologies.getById(topology.getId()) == null) {
continue;
}This guard is non-obvious — it exists because 2. 🟢 Low —
3. 🟢 Low — The class goes from 6 to 10 constructors. This is consistent with the existing style, but a Tests4. 🔴 High — Missing total-worker count assertions Several tests verify per-supervisor slot counts but don't assert that the total number of In assertEquals(1, usedSlotCount(cluster, "sup-0"));
assertEquals(1, usedSlotCount(cluster, "sup-1"));
assertEquals(1, usedSlotCount(cluster, "sup-2"));Imagine a bug in Adding one line to every relocating test closes this entirely: // topology declared numWorkers=3; relocation must preserve that
assertEquals(3, cluster.getAssignedNumWorkers(firstTopology(cluster)));Affected tests: 5. 🔴 High — Round-robin test under-asserts
assertTrue(slotsOnSup2ForTopoA >= 1);
assertTrue(slotsOnSup2ForTopoB >= 1);Consider a broken implementation where the inner topology loop exits after the first The fix is to assert exact counts and verify worker totals per topology: // sup-2 has 4 slots; with 2 topologies, round-robin gives each exactly 1 slot
// (each topology's budget = floor(4 workers / 3 supervisors) * 1 idle = 1)
assertEquals(1, slotsOnSup2ForTopoA);
assertEquals(1, slotsOnSup2ForTopoB);
// Total worker count must be preserved for both topologies
assertEquals(4, cluster.getAssignedNumWorkers(topoA));
assertEquals(4, cluster.getAssignedNumWorkers(topoB));
// No supervisor should have been drained to zero
assertTrue(usedSlotCount(cluster, "sup-0") > 0);
assertTrue(usedSlotCount(cluster, "sup-1") > 0);Without these, the test is really only checking "something happened on sup-2", not that Tests 7. 🟢 Low — This test hand-builds its conf map and omits 8. 🟢 Low — Setup duplication
|
- Assert getAssignedNumWorkers on every relocating test (incl. the donor tie-break case) so a relocation that loses executors no longer passes on slot counts alone. - Tighten multipleTopologies_shareIdleSlotsRoundRobin to exact per-topology counts plus donor-not-drained, so a broken inner loop can no longer pass. - Parameterize the flap-guard boundary test over threshold-1/threshold/ threshold+1. - Hoist the enabled flag and idle-supervisor set out of the per-topology loop via topologyCanReuseIdleSupervisor, dropping the redundant per-topology config read and supervisor rescan. - Route the remaining hand-built test fixtures through the shared newCluster/evenRebalanceConf/buildAssignment helpers. - Document the needsSchedulingTopologies null-guard in DefaultScheduler and EvenScheduler, spelling out the full-set / single-topology / leftover-subset caller cases.
|
Thanks for the thorough review — and for flagging 4 and 5 specifically; those were the right things to tighten. I've pushed a follow-up addressing the feedback. 4 (total-worker assertions) — Done. Every relocating test now asserts 5 (round-robin under-asserts) — Done. 1 (null-guard comment) — Added a comment above the guard in both 2 (redundant config re-read / supervisor rescan) — Done. The feature-flag check is hoisted to the top of 6 (flap-guard boundary) — Done. The two methods are merged into a single 7 (conf inconsistency) — Done. 8 (setup duplication) — Done. Extracted a shared 3 (SupervisorDetails constructor count) — Left as constructors for now. The new Thanks again! |
rzo1
left a comment
There was a problem hiding this comment.
Nice, thanks for the PR! A few non-blocking items.
-
Cluster.hasIdleSupervisorReusableBy(...)is dead in production. The scheduling path uses the privatetopologyCanReuseIdleSupervisor(...)instead (with a comment noting it's "equivalent … but reuses the pre-computed set"), so the publicClustermethod is reached only by the tests and the description's call-path table. A public API on a core class kept alive solely for assertions is a smell, and the two implementations can drift. Either route the production trigger through it, or drop it and assert againstredistributeOntoIdleSupervisors' observable effect. -
Two opposite "unknown uptime" defaults. The legacy
SupervisorDetailsconstructors defaultuptimeSecstoLong.MAX_VALUE(always eligible), whileNimbus.supervisorUptimeSecs()returns0Lwhen unset (never eligible). Production always takes the0Lbranch — conservative, good — soLong.MAX_VALUEonly affects callers that don't use this feature. Defensible, but the opposite defaults surprise; one short comment on each would help the next reader. -
Budget is based on declared workers, not assigned.
target = (topo.getNumWorkers() / nonBlacklistedSupervisorCount) * idleSupervisorCountuses declarednumWorkers, so for an under-assigned topology it over-estimates.relocateOneWorkerOntoIdleSlotreturns false once no donor has ≥2 workers, so it self-limits and can't over-move — but a one-line javadoc noting the cap is an upper bound, not a guarantee, would be accurate. -
freeSlotthenassignis non-atomic. Ifassignever threw afterfreeSlot(victim), the executors are orphaned for that round (recovered by the normal pass). Target is a verified idle free slot so this is near-impossible in practice — a comment noting the ordering assumption is enough. -
Minor:
@VisibleForTestingonredistributeOntoIdleSupervisorsreads as "would be private," but it's genuinely the cross-class entry point (called fromDefaultSchedulerandEvenScheduler). Slightly misleading annotation.
Test coverage
Two parts of the budget formula aren't actually exercised by the current cases:
- The
idleSupervisorCountmultiplier always resolves to* 1. Every test has exactly one usable idle supervisor (sup-2; the reserved-host test blacklistssup-2, leaving onlysup-3), so a regression in the* idleSupervisorCountterm would go unnoticed. A case with two simultaneously-idle supervisors would cover it. - The
max.free.per.topologyclamp is never the binding constraint. InredistributeRelocatesAtMostMaxFreeWorkersPerTopologythe even-budget isfloor(3/3)*1 = 1andmaxFree = 1, so the two coincide and the test still passes even ifMath.min(target, maxFree)were removed. A fixture where the even-budget is ≥2 butmaxFree = 1would pin down the clamp specifically.
Overall LGTM once #1 is resolved one way or the other. 👍
What is the purpose of the change
EvenScheduler(and thereforeDefaultScheduler) does not move workers onto a supervisor that returns to service after maintenance. The topology's desired worker count is already satisfied across the surviving supervisors, soneedsSchedulingreports nothing to do and the returned supervisor sits atused = 0until an operator manually rebalances or restarts every affected topology.This PR adds an opt-in, binary-trigger pass to
EvenSchedulerthat relocates already-assigned workers onto such idle supervisors, round-robin across topologies, in a single scheduling round. It is disabled by default, so existing clusters see no behavior change. Implements the proposal in #8590 and folds in the review feedback from that thread.How it works
The trigger and the relocation live entirely on the
EvenSchedulerpath;Cluster.needsSchedulingis intentionally left unchanged (see Scope below).Cluster.hasIdleSupervisorReusableBy(topology)returns true only when at least one stable, non-blacklisted supervisor has zero used slots and the topology is not already on it. Because the check is binary (a supervisor either has zero used slots or it does not), it never fires for an "almost balanced" cluster, so no time-based cooldown is needed.floor(numWorkers / nonBlacklistedSupervisorCount) * idleSupervisorCountworkers per round, tightened further bynimbus.even.rebalance.max.free.per.topologywhen positive. A topology whose budget computes to0(typicallynumWorkers < supervisorCount) is skipped entirely — this is also what stops a single-worker topology from ping-ponging.EvenScheduler.redistributeOntoIdleSupervisorswalks the eligible topologies (ordered by id) and moves at most one worker per topology per iteration until the idle slots are exhausted. A single returning supervisor therefore ends up hosting workers from several topologies, preserving the per-supervisor workload diversity a fresh submission has, instead of letting the first scheduled topology grab the entire idle capacity.cluster.freeSlot()+cluster.assign(), bypassing the regularsortSlots/interleave pass that would otherwise drop some of them straight back into the just-vacated slots.Nimbuspropagates the result the usual way: it diffs the resulting assignments against the existing ones and pushes the delta, so the relocation takes effect even thoughneedsSchedulingis untouched.Scope: RAS, Multitenant, Isolation
The feature is scoped to
EvenScheduler/DefaultScheduler(and the leftover topologiesIsolationSchedulerdelegates to them).Cluster.needsSchedulingis deliberately not modified — the new logic lives in three newClustermethods (hasIdleSupervisorReusableBy,isIdleSupervisorAvailableForEvenRebalance,hasMinimumIdleSupervisorStability) reached only fromEvenScheduler.redistributeOntoIdleSupervisors. This keeps any scheduler that consultsneedsSchedulingfrom picking up a surprise "needs rescheduling" signal.Call-path audit:
EvenScheduler.scheduleTopologiesEvenlyneedsScheduling(unchanged)redistributeOntoIdleSupervisorsdirectly; gated by the default-off flagDefaultScheduler.defaultScheduleneedsScheduling(unchanged)IsolationSchedulerDefaultScheduler.defaultScheduleResourceAwareSchedulerneedsSchedulingRas(unchanged)MultitenantScheduler(DefaultPool/IsolatedPool)cluster.needsScheduling(unchanged)redistribute;needsSchedulingitself is unmodifiedIn words: RAS is intentionally out of scope — it uses
needsSchedulingRasand a different placement engine; a parallel mechanism, if wanted, belongs in a follow-up. Multitenant pools do callneedsScheduling, but since that method is unchanged they are unaffected. Isolation: bothhasIdleSupervisorReusableByandredistributeOntoIdleSupervisorsskip blacklisted supervisors, andIsolationSchedulerrepresents a reserved host by blacklisting it before delegating leftovers — so an isolated host can never be a donor or a target, including the case where its isolated topology is down and the reserved host looks idle.Configuration
All keys are dot-only, matching Storm's convention.
nimbus.even.rebalance.idle.supervisor.enabledfalsenimbus.even.rebalance.max.free.per.topology00= unbounded; the even-distribution budget applies)nimbus.even.rebalance.idle.supervisor.min.stable.rounds30disables the guardThe flap guard keeps workers off a supervisor that has only just returned and may still be flapping on a slow JVM startup or a transient network blip. A supervisor is eligible only once it has been up for at least
min.stable.rounds * supervisor.monitor.frequency.secs(≈9s with the defaults). It reusesSupervisorInfo.uptime_secs, surfaced ontoSupervisorDetails.When you would NOT want to enable this
A relocation is a worker JVM restart: brief tuple replay, JIT re-warmup, and possible windowed/stateful bolt state churn. Keep this off for:
min.stable.rounds, or leave the feature off);Blast radius
In one scheduling pass the simultaneous worker-restart count is
min(idle_slots, eligible_topologies), with each topology's contribution capped atfloor(numWorkers / nonBlacklistedSupervisorCount) * idleSupervisorCount(tightened bymax.free.per.topology). Because every relocation consumes one idle slot, the total per pass is hard-bounded by the returning supervisor's free-slot count.Worked example: one returning supervisor with 8 slots in a 50-topology cluster → 8 simultaneous worker restarts across 8 topologies in one pass, not 50.
A cluster-wide ceiling (
nimbus.even.rebalance.max.relocations.per.round) was considered but not added: the per-topology cap plus the natural idle-slot ceiling already bound the disruption, and an extra knob would only let operators throttle below "fill the returned supervisor in one pass." Happy to add it if reviewers prefer an explicit cluster-wide cap.How was this change tested
New
TestEvenSchedulerIdleSupervisor(storm-server), 16 cases:needsScheduling/needsSchedulingRaspaths staying unaffected;max.free.per.topology;DefaultSchedulerleftover-subset path;IsolationSchedulerinteraction (idle non-isolated target only; reserved host stays out even when its isolated topology is down).Backward compatibility
Default-off with no API removals — only additions. When disabled,
redistributeOntoIdleSupervisorsreturns before scanning any supervisor, so a cluster that has not opted in does no extra per-round work. ExistingSupervisorDetailsconstructors defaultuptimeSecstoLong.MAX_VALUE(always "stable"), leaving every existing caller unchanged.Closes #8590