Conversation
Codecov Report❌ Patch coverage is ❌ Your patch check has failed because the patch coverage (9.74%) is below the target coverage (50.00%). You can increase the patch coverage or adjust the target coverage. Additional details and impacted files@@ Coverage Diff @@
## master #3163 +/- ##
============================================
- Coverage 74.47% 71.29% -3.19%
Complexity 943 943
============================================
Files 1183 1184 +1
Lines 105866 102280 -3586
Branches 82899 79328 -3571
============================================
- Hits 78846 72921 -5925
- Misses 24275 26413 +2138
- Partials 2745 2946 +201
🚀 New features to boost your workflow:
|
39ce6cb to
a1d20d7
Compare
hubcio
left a comment
There was a problem hiding this comment.
out of diff findings:
core/partitions/src/messages_writer.rs:60-63 — let _ = file.sync_all().await.map_err(...) swallows IO error on the file_exists open path. EIO at boot indicates dying media, but bootstrap proceeds with a stale view of disk. on master HEAD already, but server-ng makes this path live for every partition. propagate the error.
core/partitions/src/iggy_index_writer.rs:53 — let _ = file.sync_all().await; same pattern, no map_err at all. propagate.
core/partitions/src/messages_writer.rs:137 — let chunk_vec: Vec<_> = chunk.to_vec(); allocates a Vec per chunk per save_frozen_batches. steady-state per-batch alloc on what is now the primary write path. cache a reusable Vec in the writer or refactor the compio iov call to take a borrowed slice.
core/shard/src/lib.rs:622 — let namespaces: Vec<_> = planes.1.0.namespaces().copied().collect(); allocates a fresh Vec per inbox-frame iteration. core/shard/src/router.rs:270 calls process_loopback after every dispatched frame in steady state. hoist a namespaces_buf: Vec<u64> like the existing loopback_buf, or short-circuit when partitions / loopback drained are empty.
core/shard/src/router.rs:94-103 and 199-208 — shards_table.shard_for(...).unwrap_or_else(|| { warn!(...); 0 }) silently falls back to shard 0. single-shard server-ng masks this today; multi-shard makes it a silent routing-bug hider. drop the fallback or fail loud.
core/binary_protocol/src/consensus/message.rs:347 — transmute_header does a 256B header copy + 256B memset + Message::try_from(owned) re-validate after the closure already wrote a known-valid header. that's a 3rd validation per request (1st transport recv, 2nd try_into_typed, 3rd here). reached per-client-message via the new make_deferred_*_handler closures at core/server-ng/src/bootstrap.rs:1086,1116. add a transmute_header_unchecked that skips the trailing TryFrom validation and rely on the closure invariant.
core/metadata/src/stm/stream.rs:615-676 — Snapshotable::to_snapshot reads round_robin_counter with Ordering::Relaxed during snapshot fill. concurrent producer increments race the snapshot read. single-shard today: not exploitable. document the invariant.
core/common/src/types/streaming_stats.rs counter ordering — counter fetch_add / fetch_sub use Ordering::AcqRel even though the counters synchronize no other data. Relaxed is correct; AcqRel forces unnecessary ldar / stlr fences on aarch64. per-batch produce path. confirmed via method names like _inconsistent and load_for_snapshot that no consumer relies on the AcqRel half.
| ) | ||
| .await?; | ||
|
|
||
| let current_offset = partition |
There was a problem hiding this comment.
current_offset here is derived from max(end_offset where size > 0). end_offset comes from the segment metadata index. if the previous run wrote messages but index entries weren't fsynced before a crash, end_offset lags actual messages.log bytes. next append assigns logical offsets that overlap pre-crash messages already on disk - duplicate-offset corruption visible to all SDK clients. derive current_offset from messages-side size + tail-replay, or fail-fast on index/messages length divergence.
| let consumer_offsets = ConsumerOffsets::with_capacity(loaded_consumer_offsets.len()); | ||
| { | ||
| let guard = consumer_offsets.pin(); | ||
| for offset in loaded_consumer_offsets { |
There was a problem hiding this comment.
also lines 647-650. silent down-clamp of persisted consumer/group offsets to current_offset. if recovered current_offset < persisted consumer offset, the partition log was truncated/lost relative to consumer state, but the clamp causes the consumer to re-deliver an already-acked range without warning. that breaks at-least-once. fix: detect divergence and either surface an error or require explicit operator opt-in via config.
| let topic_id = namespace.topic_id(); | ||
| let partition_id = namespace.partition_id(); | ||
| let stats = Arc::new(PartitionStats::new(topic_stats)); | ||
| let consensus = VsrConsensus::new( |
There was a problem hiding this comment.
per-partition VsrConsensus::new() + init() boots fresh-zero (commit_min/commit_max/sequencer/view all 0). no restore_commit_state, no view restore. today this is vacuous because MemoryMessageJournal is volatile, but as soon as a durable partition journal lands, ack-but-not-yet-applied ops are silently lost on restart. either stub a hard-error if the partition journal contains a tail past snapshot, or implement matching restore now so the call shape exists.
| ) { | ||
| let index_path = index_reader.path(); | ||
| let index_size = std::fs::metadata(&index_path).map_or(0, |metadata| metadata.len()); | ||
| partition.log.messages_writers_mut()[active_index] = Some(Rc::new( |
There was a problem hiding this comment.
hydrate_partition_log re-opens active segment files via OpenOptions::create(true).write(true) with file_exists=true hardcoded at lines 527-541. if the file vanished between load_segments and writer init, metadata().len() returns 0 and the size atomic resets to 0, but in-memory segment.end_offset still holds the stale value - next append writes at file pos 0 with logical offset N+1. file-pos / logical-offset mismatch, segment metadata diverges from file content. open with create=false when file_exists=true and fail loudly if missing.
| let recovered = recover::<ServerNgMuxStateMachine>(Path::new(&config.system.path)) | ||
| .await | ||
| .map_err(ServerNgError::MetadataRecovery)?; | ||
| let restored_op = recovered.last_applied_op.unwrap_or_else(|| { |
There was a problem hiding this comment.
restored_op = snapshot.sequence_number() when last_applied_op = None is correct only because IggySnapshot::persist (core/metadata/src/impls/metadata.rs:91-130) is atomic (tmp + fsync + rename + dir fsync). that's an implicit contract. add a doc-comment naming the invariant so future snapshot persistence changes don't quietly break this fallback path.
| pub consumer_group: ConsumerGroupConfig, | ||
| pub data_maintenance: DataMaintenanceConfig, | ||
| #[serde(default)] | ||
| pub extra: ExtraConfig, |
There was a problem hiding this comment.
[extra] TOML section name is unprecedented in this codebase. zero matches in legacy config.toml, no pub extra field anywhere else. the only field inside is namespace, so ExtraConfig is a single-field wrapper. operator-facing schema decision worth getting right before this lands. rename to [system.namespace] (sibling of system.sharding, which already lives there) or top-level [namespace]; drop the ExtraConfig wrapper.
| .await | ||
| } | ||
|
|
||
| pub async fn load_with_path( |
There was a problem hiding this comment.
also config_provider_with_default at line 222. new public API with no external callers. ServerNgConfig::load reimplements path/include logic at core/configs/src/server_ng_config/server_ng.rs:116-131 instead of delegating to these helpers. all 5 grep hits are internal to this file. either route ServerNgConfig::load through the new helpers, or revert these methods - right now they pay public-API tax for zero callers.
| commit_min <= commit_max, | ||
| "commit_min ({commit_min}) must be <= commit_max ({commit_max})" | ||
| ); | ||
| assert_eq!( |
There was a problem hiding this comment.
restore_commit_state asserts commit_min == 0 && commit_max == 0 on entry. today fine, but if VsrConsensus::new ever changes its initial commit defaults (e.g., for recovery hooks), the asserts break silently in tests but mask corruption in prod. replace with a Builder pattern, or a restored: Cell<bool> guard set on first restore so the contract is enforced by construction.
| .await?; | ||
|
|
||
| // 4. Replay journal entries after snapshot | ||
| let headers_to_replay = journal.iter_headers_from(replay_from); |
There was a problem hiding this comment.
replay loop mux_stm.update(entry)? aborts on first error and recover() returns Err, bootstrap fails. the journal still contains those entries, so an operator restart hits the same error - node is permanently un-bootable until the journal is manually truncated. there's no skip-and-repair-via-VSR path and no contiguity check on the WAL. compounds with PrepareJournal SLOT_COUNT = 1024 silent eviction. confirm intentional fail-fast for now and document the repair path; longer-term, walk the hash chain and stop at the first break.
| bits_required((max_partitions - 1) as u64) | ||
| }; | ||
|
|
||
| let required_bits = stream_bits |
There was a problem hiding this comment.
ExceedsU64 { required_bits: u32::MAX } placeholder is unreachable. bits_required returns u32 ≤ 64 (loop bounded by u64::BITS at lines 28-38); checked_add of three values each ≤ 64 has max sum 192, cannot overflow u32. drop the checked_add branch and sum directly - the required_bits > u64::BITS check below is the real guard.
Implement
Replicabootstrap logic. Currently it does not handle cases where replicas are out of sync (needsState Transferto be implemented).