Skip to content

Commit 90d2bb8

Browse files
authored
Merge pull request #59 from ethpandaops/fix/leadership-callback-notification
fix(leaderelection): add callback-based notification for guaranteed delivery
2 parents ffcdc41 + 91a2a6d commit 90d2bb8

5 files changed

Lines changed: 1106 additions & 48 deletions

File tree

pkg/leaderelection/interface.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,12 @@ import (
55
"time"
66
)
77

8+
// LeadershipCallback is a function invoked when leadership status changes.
9+
// The callback is invoked synchronously - implementations should return quickly (< 100ms)
10+
// to avoid delaying leadership renewal. Long-running operations should be spawned
11+
// in a separate goroutine.
12+
type LeadershipCallback func(ctx context.Context, isLeader bool)
13+
814
// Elector defines the interface for leader election implementations.
915
type Elector interface {
1016
// Start begins the leader election process
@@ -18,8 +24,19 @@ type Elector interface {
1824

1925
// LeadershipChannel returns a channel that receives leadership changes
2026
// true = gained leadership, false = lost leadership
27+
//
28+
// Deprecated: Use OnLeadershipChange for guaranteed delivery.
29+
// This channel may drop events if the buffer is full.
2130
LeadershipChannel() <-chan bool
2231

32+
// OnLeadershipChange registers a callback for guaranteed leadership notification.
33+
// The callback is invoked synchronously when leadership status changes.
34+
// Multiple callbacks can be registered and will be invoked in registration order.
35+
//
36+
// Important: Keep callbacks fast (< 100ms) to avoid delaying leadership renewal.
37+
// For long-running operations, spawn a goroutine within the callback.
38+
OnLeadershipChange(callback LeadershipCallback)
39+
2340
// GetLeaderID returns the current leader's ID
2441
GetLeaderID() (string, error)
2542
}

pkg/leaderelection/redis_election.go

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ type RedisElector struct {
2828
leadershipStartTime time.Time
2929
stopped bool
3030

31+
// Callback-based notification (guaranteed delivery)
32+
callbacksMu sync.RWMutex
33+
callbacks []LeadershipCallback
34+
3135
leadershipChan chan bool
3236
stopChan chan struct{}
3337
wg sync.WaitGroup
@@ -154,6 +158,29 @@ func (e *RedisElector) GetLeaderID() (string, error) {
154158
return val, nil
155159
}
156160

161+
// OnLeadershipChange registers a callback for guaranteed leadership notification.
162+
// The callback is invoked synchronously when leadership status changes.
163+
// Multiple callbacks can be registered and will be invoked in registration order.
164+
func (e *RedisElector) OnLeadershipChange(callback LeadershipCallback) {
165+
e.callbacksMu.Lock()
166+
defer e.callbacksMu.Unlock()
167+
168+
e.callbacks = append(e.callbacks, callback)
169+
}
170+
171+
// notifyLeadershipChange invokes all registered callbacks with the leadership status.
172+
// Callbacks are invoked synchronously in registration order.
173+
func (e *RedisElector) notifyLeadershipChange(ctx context.Context, isLeader bool) {
174+
e.callbacksMu.RLock()
175+
callbacks := make([]LeadershipCallback, len(e.callbacks))
176+
copy(callbacks, e.callbacks)
177+
e.callbacksMu.RUnlock()
178+
179+
for _, callback := range callbacks {
180+
callback(ctx, isLeader)
181+
}
182+
}
183+
157184
// run is the main election loop.
158185
func (e *RedisElector) run(ctx context.Context) {
159186
defer e.wg.Done()
@@ -163,7 +190,7 @@ func (e *RedisElector) run(ctx context.Context) {
163190

164191
// Try to acquire leadership immediately
165192
if e.tryAcquireLeadership(ctx) {
166-
e.handleLeadershipGain()
193+
e.handleLeadershipGain(ctx)
167194
}
168195

169196
for {
@@ -176,12 +203,12 @@ func (e *RedisElector) run(ctx context.Context) {
176203
if e.IsLeader() {
177204
// Try to renew leadership
178205
if !e.renewLeadership(ctx) {
179-
e.handleLeadershipLoss()
206+
e.handleLeadershipLoss(ctx)
180207
}
181208
} else {
182209
// Try to acquire leadership
183210
if e.tryAcquireLeadership(ctx) {
184-
e.handleLeadershipGain()
211+
e.handleLeadershipGain(ctx)
185212
}
186213
}
187214
}
@@ -304,9 +331,13 @@ func (e *RedisElector) releaseLeadership(ctx context.Context) error {
304331
}
305332

306333
// handleLeadershipGain is called when leadership is acquired.
307-
func (e *RedisElector) handleLeadershipGain() {
334+
func (e *RedisElector) handleLeadershipGain(ctx context.Context) {
308335
e.log.Info("Gained leadership")
309336

337+
// Notify callbacks first (guaranteed delivery)
338+
e.notifyLeadershipChange(ctx, true)
339+
340+
// Send to channel for backward compatibility (best-effort)
310341
select {
311342
case e.leadershipChan <- true:
312343
default:
@@ -315,7 +346,7 @@ func (e *RedisElector) handleLeadershipGain() {
315346
}
316347

317348
// handleLeadershipLoss is called when leadership is lost.
318-
func (e *RedisElector) handleLeadershipLoss() {
349+
func (e *RedisElector) handleLeadershipLoss(ctx context.Context) {
319350
e.mu.Lock()
320351
wasLeader := e.isLeader
321352
e.isLeader = false
@@ -330,6 +361,10 @@ func (e *RedisElector) handleLeadershipLoss() {
330361

331362
e.log.Info("Lost leadership")
332363

364+
// Notify callbacks first (guaranteed delivery)
365+
e.notifyLeadershipChange(ctx, false)
366+
367+
// Send to channel for backward compatibility (best-effort)
333368
select {
334369
case e.leadershipChan <- false:
335370
default:

0 commit comments

Comments
 (0)