Skip to content

feat(cluster): implement replica bootstrap#3163

Open
numinnex wants to merge 7 commits intomasterfrom
replica_bootstrap
Open

feat(cluster): implement replica bootstrap#3163
numinnex wants to merge 7 commits intomasterfrom
replica_bootstrap

Conversation

@numinnex
Copy link
Copy Markdown
Contributor

Implement Replica bootstrap logic. Currently it does not handle cases where replicas are out of sync (needs State Transfer to be implemented).

@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 24, 2026

Codecov Report

❌ Patch coverage is 9.74359% with 1232 lines in your changes missing coverage. Please review.
✅ Project coverage is 71.29%. Comparing base (e937852) to head (991523b).

Files with missing lines Patch % Lines
core/server-ng/src/bootstrap.rs 0.00% 1057 Missing ⚠️
core/server-ng/src/config_writer.rs 0.00% 71 Missing ⚠️
core/server-ng/src/main.rs 0.00% 33 Missing ⚠️
core/common/src/sharding/namespace.rs 73.77% 14 Missing and 2 partials ⚠️
core/common/src/types/streaming_stats.rs 50.00% 12 Missing ⚠️
core/configs/src/server_ng_config/validators.rs 41.17% 7 Missing and 3 partials ⚠️
core/consensus/src/impls.rs 0.00% 10 Missing ⚠️
core/configs/src/server_ng_config/displays.rs 0.00% 7 Missing ⚠️
core/metadata/src/stm/stream.rs 0.00% 5 Missing ⚠️
core/shard/src/router.rs 0.00% 5 Missing ⚠️
... and 2 more

❌ Your patch check has failed because the patch coverage (9.74%) is below the target coverage (50.00%). You can increase the patch coverage or adjust the target coverage.

Additional details and impacted files
@@             Coverage Diff              @@
##             master    #3163      +/-   ##
============================================
- Coverage     74.47%   71.29%   -3.19%     
  Complexity      943      943              
============================================
  Files          1183     1184       +1     
  Lines        105866   102280    -3586     
  Branches      82899    79328    -3571     
============================================
- Hits          78846    72921    -5925     
- Misses        24275    26413    +2138     
- Partials       2745     2946     +201     
Components Coverage Δ
Rust Core 71.69% <9.74%> (-4.07%) ⬇️
Java SDK 60.14% <ø> (ø)
C# SDK 69.07% <ø> (-0.31%) ⬇️
Python SDK 81.43% <ø> (ø)
Node SDK 91.44% <ø> (-0.10%) ⬇️
Go SDK 39.60% <ø> (ø)
Files with missing lines Coverage Δ
core/configs/src/server_config/server.rs 80.28% <100.00%> (+6.20%) ⬆️
core/configs/src/server_ng_config/defaults.rs 100.00% <100.00%> (ø)
core/configs/src/server_ng_config/server_ng.rs 47.05% <100.00%> (+6.07%) ⬆️
core/message_bus/src/replica/io.rs 83.09% <100.00%> (+0.18%) ⬆️
core/metadata/src/stm/mod.rs 41.50% <100.00%> (+1.70%) ⬆️
core/metadata/src/stm/mux.rs 92.63% <100.00%> (+0.96%) ⬆️
core/partitions/src/iggy_index_writer.rs 0.00% <ø> (ø)
core/partitions/src/lib.rs 0.00% <ø> (ø)
core/partitions/src/messages_writer.rs 0.00% <ø> (ø)
core/message_bus/src/lib.rs 92.08% <0.00%> (-0.82%) ⬇️
... and 11 more

... and 112 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@numinnex numinnex force-pushed the replica_bootstrap branch from 39ce6cb to a1d20d7 Compare May 4, 2026 13:18
Copy link
Copy Markdown
Contributor

@hubcio hubcio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of diff findings:

core/partitions/src/messages_writer.rs:60-63let _ = file.sync_all().await.map_err(...) swallows IO error on the file_exists open path. EIO at boot indicates dying media, but bootstrap proceeds with a stale view of disk. on master HEAD already, but server-ng makes this path live for every partition. propagate the error.

core/partitions/src/iggy_index_writer.rs:53let _ = file.sync_all().await; same pattern, no map_err at all. propagate.

core/partitions/src/messages_writer.rs:137let chunk_vec: Vec<_> = chunk.to_vec(); allocates a Vec per chunk per save_frozen_batches. steady-state per-batch alloc on what is now the primary write path. cache a reusable Vec in the writer or refactor the compio iov call to take a borrowed slice.

core/shard/src/lib.rs:622let namespaces: Vec<_> = planes.1.0.namespaces().copied().collect(); allocates a fresh Vec per inbox-frame iteration. core/shard/src/router.rs:270 calls process_loopback after every dispatched frame in steady state. hoist a namespaces_buf: Vec<u64> like the existing loopback_buf, or short-circuit when partitions / loopback drained are empty.

core/shard/src/router.rs:94-103 and 199-208shards_table.shard_for(...).unwrap_or_else(|| { warn!(...); 0 }) silently falls back to shard 0. single-shard server-ng masks this today; multi-shard makes it a silent routing-bug hider. drop the fallback or fail loud.

core/binary_protocol/src/consensus/message.rs:347transmute_header does a 256B header copy + 256B memset + Message::try_from(owned) re-validate after the closure already wrote a known-valid header. that's a 3rd validation per request (1st transport recv, 2nd try_into_typed, 3rd here). reached per-client-message via the new make_deferred_*_handler closures at core/server-ng/src/bootstrap.rs:1086,1116. add a transmute_header_unchecked that skips the trailing TryFrom validation and rely on the closure invariant.

core/metadata/src/stm/stream.rs:615-676Snapshotable::to_snapshot reads round_robin_counter with Ordering::Relaxed during snapshot fill. concurrent producer increments race the snapshot read. single-shard today: not exploitable. document the invariant.

core/common/src/types/streaming_stats.rs counter ordering — counter fetch_add / fetch_sub use Ordering::AcqRel even though the counters synchronize no other data. Relaxed is correct; AcqRel forces unnecessary ldar / stlr fences on aarch64. per-batch produce path. confirmed via method names like _inconsistent and load_for_snapshot that no consumer relies on the AcqRel half.

)
.await?;

let current_offset = partition
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

current_offset here is derived from max(end_offset where size > 0). end_offset comes from the segment metadata index. if the previous run wrote messages but index entries weren't fsynced before a crash, end_offset lags actual messages.log bytes. next append assigns logical offsets that overlap pre-crash messages already on disk - duplicate-offset corruption visible to all SDK clients. derive current_offset from messages-side size + tail-replay, or fail-fast on index/messages length divergence.

let consumer_offsets = ConsumerOffsets::with_capacity(loaded_consumer_offsets.len());
{
let guard = consumer_offsets.pin();
for offset in loaded_consumer_offsets {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also lines 647-650. silent down-clamp of persisted consumer/group offsets to current_offset. if recovered current_offset < persisted consumer offset, the partition log was truncated/lost relative to consumer state, but the clamp causes the consumer to re-deliver an already-acked range without warning. that breaks at-least-once. fix: detect divergence and either surface an error or require explicit operator opt-in via config.

let topic_id = namespace.topic_id();
let partition_id = namespace.partition_id();
let stats = Arc::new(PartitionStats::new(topic_stats));
let consensus = VsrConsensus::new(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

per-partition VsrConsensus::new() + init() boots fresh-zero (commit_min/commit_max/sequencer/view all 0). no restore_commit_state, no view restore. today this is vacuous because MemoryMessageJournal is volatile, but as soon as a durable partition journal lands, ack-but-not-yet-applied ops are silently lost on restart. either stub a hard-error if the partition journal contains a tail past snapshot, or implement matching restore now so the call shape exists.

) {
let index_path = index_reader.path();
let index_size = std::fs::metadata(&index_path).map_or(0, |metadata| metadata.len());
partition.log.messages_writers_mut()[active_index] = Some(Rc::new(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hydrate_partition_log re-opens active segment files via OpenOptions::create(true).write(true) with file_exists=true hardcoded at lines 527-541. if the file vanished between load_segments and writer init, metadata().len() returns 0 and the size atomic resets to 0, but in-memory segment.end_offset still holds the stale value - next append writes at file pos 0 with logical offset N+1. file-pos / logical-offset mismatch, segment metadata diverges from file content. open with create=false when file_exists=true and fail loudly if missing.

let recovered = recover::<ServerNgMuxStateMachine>(Path::new(&config.system.path))
.await
.map_err(ServerNgError::MetadataRecovery)?;
let restored_op = recovered.last_applied_op.unwrap_or_else(|| {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

restored_op = snapshot.sequence_number() when last_applied_op = None is correct only because IggySnapshot::persist (core/metadata/src/impls/metadata.rs:91-130) is atomic (tmp + fsync + rename + dir fsync). that's an implicit contract. add a doc-comment naming the invariant so future snapshot persistence changes don't quietly break this fallback path.

pub consumer_group: ConsumerGroupConfig,
pub data_maintenance: DataMaintenanceConfig,
#[serde(default)]
pub extra: ExtraConfig,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[extra] TOML section name is unprecedented in this codebase. zero matches in legacy config.toml, no pub extra field anywhere else. the only field inside is namespace, so ExtraConfig is a single-field wrapper. operator-facing schema decision worth getting right before this lands. rename to [system.namespace] (sibling of system.sharding, which already lives there) or top-level [namespace]; drop the ExtraConfig wrapper.

.await
}

pub async fn load_with_path(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also config_provider_with_default at line 222. new public API with no external callers. ServerNgConfig::load reimplements path/include logic at core/configs/src/server_ng_config/server_ng.rs:116-131 instead of delegating to these helpers. all 5 grep hits are internal to this file. either route ServerNgConfig::load through the new helpers, or revert these methods - right now they pay public-API tax for zero callers.

commit_min <= commit_max,
"commit_min ({commit_min}) must be <= commit_max ({commit_max})"
);
assert_eq!(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

restore_commit_state asserts commit_min == 0 && commit_max == 0 on entry. today fine, but if VsrConsensus::new ever changes its initial commit defaults (e.g., for recovery hooks), the asserts break silently in tests but mask corruption in prod. replace with a Builder pattern, or a restored: Cell<bool> guard set on first restore so the contract is enforced by construction.

.await?;

// 4. Replay journal entries after snapshot
let headers_to_replay = journal.iter_headers_from(replay_from);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replay loop mux_stm.update(entry)? aborts on first error and recover() returns Err, bootstrap fails. the journal still contains those entries, so an operator restart hits the same error - node is permanently un-bootable until the journal is manually truncated. there's no skip-and-repair-via-VSR path and no contiguity check on the WAL. compounds with PrepareJournal SLOT_COUNT = 1024 silent eviction. confirm intentional fail-fast for now and document the repair path; longer-term, walk the hash chain and stop at the first break.

bits_required((max_partitions - 1) as u64)
};

let required_bits = stream_bits
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExceedsU64 { required_bits: u32::MAX } placeholder is unreachable. bits_required returns u32 ≤ 64 (loop bounded by u64::BITS at lines 28-38); checked_add of three values each ≤ 64 has max sum 192, cannot overflow u32. drop the checked_add branch and sum directly - the required_bits > u64::BITS check below is the real guard.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants