Skip to content

Commit 2a0e674

Browse files
sanityclaude
andauthored
fix(transport): add background ACK timer to prevent inter-hop delays (#2284)
Co-authored-by: Claude <noreply@anthropic.com>
1 parent 3a3db48 commit 2a0e674

File tree

3 files changed

+102
-1
lines changed

3 files changed

+102
-1
lines changed

crates/core/src/transport/peer_connection.rs

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,24 @@ type Result<T = (), E = TransportError> = std::result::Result<T, E>;
3737
/// Measured overhead: 40 bytes (see symmetric_message::stream_fragment_overhead())
3838
const MAX_DATA_SIZE: usize = packet_data::MAX_DATA_SIZE - 40;
3939

40+
/// How often to check for pending ACKs and send them proactively.
41+
/// This prevents ACKs from being delayed when there's no outgoing traffic to piggyback on.
42+
///
43+
/// Set to MAX_CONFIRMATION_DELAY (100ms), which is the documented expectation from
44+
/// `ReceivedPacketTracker`. The sender's actual timeout is MESSAGE_CONFIRMATION_TIMEOUT
45+
/// (600ms = 100ms + 500ms network allowance), so 100ms provides ample margin.
46+
///
47+
/// Note: 50ms was tried initially but caused issues in Docker NAT test environments
48+
/// due to increased timer overhead. 100ms provides the right balance between
49+
/// responsiveness and system load.
50+
///
51+
/// Without this timer, ACKs would only be sent when:
52+
/// 1. The receipt buffer fills up (20 packets)
53+
/// 2. MESSAGE_CONFIRMATION_TIMEOUT (600ms) expires on packet arrival
54+
///
55+
/// This caused ~600ms delays per hop for streams larger than 20 packets.
56+
const ACK_CHECK_INTERVAL: Duration = Duration::from_millis(100);
57+
4058
#[must_use]
4159
pub(crate) struct RemoteConnection {
4260
pub(super) outbound_packets: FastSender<(SocketAddr, Arc<[u8]>)>,
@@ -289,6 +307,16 @@ impl PeerConnection {
289307
let mut timeout_check = tokio::time::interval(Duration::from_secs(5));
290308
timeout_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
291309

310+
// Background ACK timer - sends pending ACKs proactively every 100ms
311+
// This prevents delays when there's no outgoing traffic to piggyback ACKs on
312+
// Use interval_at to delay the first tick - unlike the keep-alive task which can
313+
// block to skip its first tick, we're inside a select! loop so we delay instead
314+
let mut ack_check = tokio::time::interval_at(
315+
tokio::time::Instant::now() + ACK_CHECK_INTERVAL,
316+
ACK_CHECK_INTERVAL,
317+
);
318+
ack_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
319+
292320
const FAILURE_TIME_WINDOW: Duration = Duration::from_secs(30);
293321
loop {
294322
// tracing::trace!(remote = ?self.remote_conn.remote_addr, "waiting for inbound messages");
@@ -598,6 +626,19 @@ impl PeerConnection {
598626
}
599627
}
600628
}
629+
// Background ACK timer - proactively send pending ACKs
630+
// This prevents ACK delays when there's no outgoing traffic to piggyback on
631+
_ = ack_check.tick() => {
632+
let receipts = self.received_tracker.get_receipts();
633+
if !receipts.is_empty() {
634+
tracing::trace!(
635+
peer_addr = %self.remote_conn.remote_addr,
636+
receipt_count = receipts.len(),
637+
"Background ACK timer: sending pending receipts"
638+
);
639+
self.noop(receipts).await?;
640+
}
641+
}
601642
}
602643
}
603644
}
@@ -882,6 +923,56 @@ mod tests {
882923
*,
883924
};
884925
use crate::transport::packet_data::MAX_PACKET_SIZE;
926+
use crate::transport::received_packet_tracker::MAX_PENDING_RECEIPTS;
927+
use crate::transport::sent_packet_tracker::MAX_CONFIRMATION_DELAY;
928+
929+
/// Verify that ACK_CHECK_INTERVAL is properly configured relative to MAX_CONFIRMATION_DELAY.
930+
/// The ACK timer should ensure ACKs are sent within the sender's expected confirmation window.
931+
#[test]
932+
fn ack_check_interval_is_within_confirmation_window() {
933+
// ACK_CHECK_INTERVAL should not exceed MAX_CONFIRMATION_DELAY
934+
// to ensure receipts are sent within the allowed window
935+
assert!(
936+
ACK_CHECK_INTERVAL <= MAX_CONFIRMATION_DELAY,
937+
"ACK_CHECK_INTERVAL ({:?}) must not exceed MAX_CONFIRMATION_DELAY ({:?})",
938+
ACK_CHECK_INTERVAL,
939+
MAX_CONFIRMATION_DELAY
940+
);
941+
}
942+
943+
/// Verify that the ACK timer interval is reasonable (not too fast or too slow).
944+
#[test]
945+
fn ack_check_interval_is_reasonable() {
946+
// Should not be too fast (would cause excessive CPU usage)
947+
assert!(
948+
ACK_CHECK_INTERVAL >= Duration::from_millis(10),
949+
"ACK_CHECK_INTERVAL ({:?}) should be at least 10ms to avoid excessive CPU usage",
950+
ACK_CHECK_INTERVAL
951+
);
952+
953+
// Should not be too slow (would cause unnecessary delays)
954+
assert!(
955+
ACK_CHECK_INTERVAL <= Duration::from_millis(100),
956+
"ACK_CHECK_INTERVAL ({:?}) should be at most 100ms to ensure timely ACK delivery",
957+
ACK_CHECK_INTERVAL
958+
);
959+
}
960+
961+
/// Test that MAX_PENDING_RECEIPTS is appropriate for typical stream sizes.
962+
/// A 40KB stream splits into ~28 packets, so buffer should be able to handle
963+
/// at least one batch before triggering an ACK send.
964+
#[test]
965+
fn pending_receipts_buffer_size_documented() {
966+
// Document the current buffer size - this affects when buffer-full ACKs are sent
967+
// With MAX_PENDING_RECEIPTS = 20 and typical streams of ~28 packets:
968+
// - First 20 packets: buffer fills, ACK sent
969+
// - Remaining 8 packets: rely on timer for ACK delivery
970+
// The background ACK timer ensures these remaining packets get ACKed within 100ms
971+
assert_eq!(
972+
MAX_PENDING_RECEIPTS, 20,
973+
"MAX_PENDING_RECEIPTS changed - verify ACK timing behavior is still correct"
974+
);
975+
}
885976

886977
#[tokio::test]
887978
async fn test_inbound_outbound_interaction() -> Result<(), Box<dyn std::error::Error>> {

crates/core/src/transport/received_packet_tracker.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,13 @@ use std::time::{Duration, Instant};
66

77
/// How long to retain packets in case they need to be retransmitted
88
const RETAIN_TIME: Duration = Duration::from_secs(60);
9+
10+
/// Maximum number of pending receipts before forcing an ACK send.
11+
/// When this many packets have been received without sending ACKs,
12+
/// the receiver will send a NOOP with all pending receipts.
13+
#[cfg(test)]
14+
pub(crate) const MAX_PENDING_RECEIPTS: usize = 20;
15+
#[cfg(not(test))]
916
const MAX_PENDING_RECEIPTS: usize = 20;
1017

1118
/// This struct is responsible for tracking received packets and deciding when to send receipts
@@ -73,7 +80,7 @@ impl<T: TimeSource> ReceivedPacketTracker<T> {
7380

7481
/// Returns a list of packets that have been received since the last call to this function.
7582
/// This should be called every time a packet is sent to ensure that receipts are sent
76-
/// promptly. Every `MAX_CONFIRMATION_DELAY` (50ms) this should be called and if the returned
83+
/// promptly. Every `MAX_CONFIRMATION_DELAY` (100ms) this should be called and if the returned
7784
/// list is not empty, the list should be sent as receipts immediately in a noop packet.
7885
pub(super) fn get_receipts(&mut self) -> Vec<PacketId> {
7986
self.cleanup();

crates/core/src/transport/sent_packet_tracker.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ const NETWORK_DELAY_ALLOWANCE: Duration = Duration::from_millis(500);
88

99
/// We can wait up to 100ms to confirm a message was received, this allows us to batch
1010
/// receipts together and send them in a single message.
11+
#[cfg(test)]
12+
pub(crate) const MAX_CONFIRMATION_DELAY: Duration = Duration::from_millis(100);
13+
#[cfg(not(test))]
1114
const MAX_CONFIRMATION_DELAY: Duration = Duration::from_millis(100);
1215

1316
/// If we don't get a receipt for a message within 500ms, we assume the message was lost and

0 commit comments

Comments
 (0)