Skip to content

feat: add deadline#276

Open
varex83 wants to merge 10 commits intomainfrom
bohdan/deadline
Open

feat: add deadline#276
varex83 wants to merge 10 commits intomainfrom
bohdan/deadline

Conversation

@varex83
Copy link
Collaborator

@varex83 varex83 commented Mar 11, 2026

Summary

Implements duty deadline tracking and notification functionality in Rust (Pluto). This adds the Deadliner trait and associated logic for scheduling duty deadlines, managing timers, and sending expired duties to a channel.

Closes #274

@emlautarom1
Copy link
Collaborator

emlautarom1 commented Mar 11, 2026

We already have a WIP deadline module (I added it at some point to validate other module, not sure which one): https://github.com/NethermindEth/pluto/blob/3f1c5a31089b2764b57dfac60bcefa3742186d09/crates/app/src/deadline/mod.rs. If that code is no longer relevant then it should be removed in this PR.

Comment on lines +782 to +783
// todo: uses hardcode beacon client for testing, should be refactored to use a
// real beacon client (testutils/beaconmock)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can mock the client by mocking the underlying reqwest::Client (we'll probably need to adjust a bit the pluto_eth1wrap::EthClient constructors)

Comment on lines +314 to +334
/// Clock trait for abstracting time operations.
trait Clock: Send + Sync {
/// Returns the current time.
fn now(&self) -> DateTime<Utc>;

/// Creates a sleep future that completes after the given duration.
fn sleep(&self, duration: std::time::Duration) -> BoxFuture<'static, ()>;
}

/// Real clock implementation using tokio::time.
struct RealClock;

impl Clock for RealClock {
fn now(&self) -> DateTime<Utc> {
Utc::now()
}

fn sleep(&self, duration: std::time::Duration) -> BoxFuture<'static, ()> {
tokio::time::sleep(duration).boxed()
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For mocking time using this approach resulted in some challenges for qbft. We should explore tokio's builtins for mocking time: https://docs.rs/tokio/latest/tokio/time/fn.advance.html

Comment on lines +95 to +98
fn secs_to_chrono(secs: u64) -> Result<chrono::Duration> {
let secs_i64 = i64::try_from(secs).map_err(|_| DeadlineError::ArithmeticOverflow)?;
chrono::Duration::try_seconds(secs_i64).ok_or(DeadlineError::DurationConversion)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a single usage for this function, prefer to inline it.

/// It may only be called once and the returned channel should be used
/// by a single task. Multiple instances are required for different
/// components and use cases.
pub trait Deadliner: Send + Sync {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the first time we introduce BoxFuture (and for me, the first time using it). Can we do with async-trait if possible?

Comment on lines +130 to +152
#[async_trait]
pub trait BeaconClientForDeadline {
/// Fetches the genesis time from the beacon node.
async fn fetch_genesis_time(&self) -> Result<DateTime<Utc>>;

/// Fetches the slot duration and slots per epoch from the beacon node.
async fn fetch_slots_config(&self) -> Result<(std::time::Duration, u64)>;
}

#[async_trait]
impl BeaconClientForDeadline for EthBeaconNodeApiClient {
async fn fetch_genesis_time(&self) -> Result<DateTime<Utc>> {
self.fetch_genesis_time()
.await
.map_err(DeadlineError::FetchGenesisTime)
}

async fn fetch_slots_config(&self) -> Result<(std::time::Duration, u64)> {
self.fetch_slots_config()
.await
.map_err(DeadlineError::FetchGenesisTime)
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to create a trait for this behaviour. We can expect a concrete EthBeaconNodeApiClient as client and do the mocking in the client itself if needed.

Comment on lines +247 to +250
// Use far-future sentinel date (9999-01-01) matching Go implementation
// This timestamp is a known constant and will never fail
let mut curr_deadline =
DateTime::from_timestamp(253402300799, 0).unwrap_or(DateTime::<Utc>::MAX_UTC);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to use the Go constant here, the sentinel is to always have a Duty available that will never resolve:

Suggested change
// Use far-future sentinel date (9999-01-01) matching Go implementation
// This timestamp is a known constant and will never fail
let mut curr_deadline =
DateTime::from_timestamp(253402300799, 0).unwrap_or(DateTime::<Utc>::MAX_UTC);
// Use far-future sentinel date (9999-01-01) matching Go implementation
// This timestamp is a known constant and will never fail
let mut curr_deadline = DateTime::<Utc>::MAX_UTC;

Comment on lines +253 to +260
let Ok(deadline_opt) = deadline_func(duty.clone()) else {
continue;
};

// Ignore duties that never expire
let Some(duty_deadline) = deadline_opt else {
continue;
};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can inline some stuff:

Suggested change
let Ok(deadline_opt) = deadline_func(duty.clone()) else {
continue;
};
// Ignore duties that never expire
let Some(duty_deadline) = deadline_opt else {
continue;
};
// Ignore duties that never expire
let Ok(Some(duty_deadline)) = deadline_func(duty.clone()) else {
continue;
};

struct DeadlinerImpl {
cancel_token: CancellationToken,
input_tx: tokio::sync::mpsc::UnboundedSender<DeadlineInput>,
output_rx: Arc<Mutex<Option<tokio::sync::mpsc::Receiver<Duty>>>>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can drop the Arc wrapper; it's not actually needed.

Suggested change
output_rx: Arc<Mutex<Option<tokio::sync::mpsc::Receiver<Duty>>>>,
output_rx: Mutex<Option<tokio::sync::mpsc::Receiver<Duty>>>,

}

/// Fake clock implementation for testing.
#[cfg(test)]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would move all these definitions to the tests module to avoid the repeated #[cfg(test)] attribute.

@emlautarom1
Copy link
Collaborator

Generally good, I don't see any potential logic/concurrency issues, but:

  • The code clearly is very Go-like and makes it quite hard to follow (ex. some ownership semantics are replicated through types like Option)
  • We can reduce mocking by leveraging that reqwest allows mocking clients. At the same time, we have a lot of machinery to mock time, so I suggest also looking into what Tokio provides out-of-the-box.

I'd suggest exploring some alternative options before moving forward; my main concern is all the timing bookkeeping that we do since it has already introduced issues in QBFT (we also mocked time there).

@varex83
Copy link
Collaborator Author

varex83 commented Mar 23, 2026

@claude

@claude
Copy link

claude bot commented Mar 23, 2026

Claude finished @varex83's task in 2m 44s —— View job


PR Review: feat: add deadline

  • Gather context and read changed files
  • Review crates/core/src/deadline.rs
  • Check reviewer comments and flag remaining issues
  • Post final review

Summary

The PR implements duty deadline tracking, porting Go's deadliner to Rust. The structural approach is sound, the existing reviewer suggestions (sentinel value, inline patterns, Arc removal) appear to have been applied. However, there are a few remaining issues ranging from a critical correctness bug to style/design concerns that should be addressed before merge.


Findings

[Critical] Busy-loop when duties set is empty or all duties are non-expiring

get_curr_duty returns DateTime::<Utc>::MAX_UTC as a sentinel when no expiring duties exist. This sentinel is then used to compute a std::time::Duration via signed_duration_since(now).to_std(). However, chrono::Duration::to_std() fails if the duration overflows std::time::Duration (max ~584 years), so the unwrap_or(std::time::Duration::ZERO) fallback fires immediately.

This creates a tight infinite busy-loop: the sleep fires at ZERO, sends a DutyType::Unknown sentinel duty to the output channel, removes it (it's not there), calls get_curr_duty again, gets MAX_UTC again, sets sleep to ZERO... continuously.

Evidence: deadline.rs:295-300, deadline.rs:343-347, deadline.rs:375-379

// All three occurrences of this pattern are affected:
let initial_duration = curr_deadline
    .signed_duration_since(now)
    .to_std()
    .unwrap_or(std::time::Duration::ZERO);  // fires immediately for MAX_UTC sentinel

Recommendation: Use a large but representable duration as sentinel (e.g., std::time::Duration::from_secs(u64::MAX / 2)) or handle the overflow case separately to avoid resetting the sleep to zero.


[High] BoxFuture on trait instead of async-trait

deadline.rs:109 introduces BoxFuture<'_, bool> as the return type for Deadliner::add. The reviewer flagged this. async-trait is already in Cargo.toml as a workspace dependency — using #[async_trait] would be more idiomatic and consistent with the rest of the codebase.

Evidence: deadline.rs:99-116, crates/core/Cargo.toml:10

Fix this →


[High] Utc::now() is not mockable — timing logic cannot be unit tested deterministically

run_task calls Utc::now() directly at lines 295, 324, 343, 346, and 376. Tokio's test-util (tokio::time::pause/advance) only controls tokio::time, not chrono::Utc::now(). The reviewer explicitly flagged this and noted it already caused issues in QBFT.

The current test (test_deadliner) works around this by using real wall-clock time with short millisecond sleeps — fragile under CI load. To be properly mockable, the time source needs to be injectable (e.g., a Fn() -> DateTime<Utc> clock parameter) or the implementation should use tokio::time::Instant throughout so tokio::time::pause()/advance() can control it.

Evidence: deadline.rs:295, deadline.rs:324, deadline.rs:343, deadline.rs:346, deadline.rs:376


[Medium] Misleading error variant for fetch_slots_config failure

new_duty_deadline_func maps both fetch_genesis_time and fetch_slots_config errors to DeadlineError::FetchGenesisTime. The second .map_err at line 134 incorrectly names the error:

// deadline.rs:131-134
let (slot_duration, _slots_per_epoch) = client
    .fetch_slots_config()
    .await
    .map_err(DeadlineError::FetchGenesisTime)?;  // wrong variant

Recommendation: Add a separate FetchSlotsConfig(#[from] ...) variant, or rename FetchGenesisTime to a more general BeaconNodeError.

Fix this →


[Medium] Unbounded input channel

new_deadliner at line 409 creates an unbounded MPSC channel for duty inputs. If callers add duties faster than the background task processes them (e.g., during startup catchup), memory can grow without bound.

Evidence: deadline.rs:409


[Low] Test: voluntary_exits semantics are inverted vs. real behavior

In test_deadliner, the custom deadline_func gives DutyType::Exit duties a 1-hour future deadline (line 532-535), so add() returns true. The test then asserts all "non-expired" duties including voluntary exits return true (line 609-612).

But in the real new_duty_deadline_func, Exit duties return Ok(None) (never expire) and add() returns false. The test exercises the deadliner machinery correctly but uses naming/semantics that contradict the actual production behavior — a future reader may be misled about what Exit duties do.

Evidence: deadline.rs:529-535, deadline.rs:609-612


[Low] match with empty arm can be simplified

The early-return match at lines 141-146 can be simplified to an if let or matches! check:

// Current:
match duty.duty_type {
    DutyType::Exit | DutyType::BuilderRegistration => { return Ok(None); }
    _ => {}
}

// Simpler:
if matches!(duty.duty_type, DutyType::Exit | DutyType::BuilderRegistration) {
    return Ok(None);
}

Parity matrix (spot-checked)

Component Go behavior Rust Notes
Never-expire duties (Exit, BuilderRegistration) returns nil returns Ok(None)
Proposer/Randao deadline slotDuration/3 + margin same
SyncMessage deadline 2*slotDuration/3 + margin same
Attester/Aggregator deadline 2*slotDuration + margin same
Sentinel deadline (empty duties) uses far-future time, loop suspends overflows → zero duration, busy-loops
add() idempotency returns true if already tracked same via HashSet::insert

Tests

Tests were not run (no toolchain available in this context), but the test structure covers: never-expire duties, per-duty-type deadline calculations, and end-to-end deadliner behavior. The main concern is the real-time dependency in test_deadliner (see High finding above).
| Branch: bohdan/deadline

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement core/deadline.go

2 participants