diff --git a/architecture/gateway.md b/architecture/gateway.md index d89706e64..c5fd349c7 100644 --- a/architecture/gateway.md +++ b/architecture/gateway.md @@ -74,7 +74,7 @@ The storage schema is intentionally narrow: | `scope` | Optional owner or namespace for scoped/versioned records, such as a sandbox ID for policy revisions. | | `version` | Optional monotonically increasing version for scoped records. | | `status` | Optional workflow state for records such as policy revisions or draft policy chunks. | -| `dedup_key` and `hit_count` | Optional policy-advisor fields for coalescing repeated observations. | +| `dedup_key` and `hit_count` | Optional policy-advisor fields for coalescing repeated observations. Draft policy chunks only set `dedup_key` while pending; the slot is released when the draft is approved or rejected so a later denial for the same destination can surface as a new pending draft. | | `payload` | Prost-encoded protobuf payload for the full domain object. | | `created_at_ms` and `updated_at_ms` | Gateway timestamps used for ordering and list output. | | `labels` | JSON object carrying Kubernetes-style object labels for filtering and organization. | diff --git a/architecture/security-policy.md b/architecture/security-policy.md index e5f179dc1..933f5ecf1 100644 --- a/architecture/security-policy.md +++ b/architecture/security-policy.md @@ -72,6 +72,19 @@ recommendations: 4. A human or admin workflow approves or rejects drafts. 5. Approved drafts merge into the target sandbox policy. +Drafts dedup on `(sandbox, host, port, binary)` while they are pending so repeat +denials accumulate hits on a single recommendation. Once a draft is decided +(approved or rejected) it releases its dedup slot. A fresh denial against the +same destination — for example, a hostname rule with stale `allowed_ips` after +DNS resolves to a new backend — therefore surfaces as a new pending draft +carrying the newly observed details, rather than being silently absorbed by the +existing decision. + +Because decided drafts coexist with pending peers for the same destination, the +gateway refuses approve, reject, and undo operations that would otherwise +overwrite or strip a rule another draft contributes to. The error names the +conflicting peer so the operator can decide it first. + The advisor should propose narrow additions and preserve explicit-deny behavior. It is a workflow aid, not an automatic permission grant. diff --git a/crates/openshell-server/migrations/postgres/005_clear_dedup_for_decided_chunks.sql b/crates/openshell-server/migrations/postgres/005_clear_dedup_for_decided_chunks.sql new file mode 100644 index 000000000..f4fed4d2e --- /dev/null +++ b/crates/openshell-server/migrations/postgres/005_clear_dedup_for_decided_chunks.sql @@ -0,0 +1,10 @@ +-- Issue #1245: only pending draft chunks should hold a dedup_key. Approved +-- and rejected chunks held the same (host, port, binary) key as pending ones, +-- so a fresh denial submission for the same key was silently absorbed by the +-- decided row's ON CONFLICT update. Clear the key on existing decided rows +-- so future submissions can insert as new pending chunks. +UPDATE objects + SET dedup_key = NULL + WHERE object_type = 'draft_policy_chunk' + AND status IN ('approved', 'rejected') + AND dedup_key IS NOT NULL; diff --git a/crates/openshell-server/migrations/sqlite/005_clear_dedup_for_decided_chunks.sql b/crates/openshell-server/migrations/sqlite/005_clear_dedup_for_decided_chunks.sql new file mode 100644 index 000000000..f4fed4d2e --- /dev/null +++ b/crates/openshell-server/migrations/sqlite/005_clear_dedup_for_decided_chunks.sql @@ -0,0 +1,10 @@ +-- Issue #1245: only pending draft chunks should hold a dedup_key. Approved +-- and rejected chunks held the same (host, port, binary) key as pending ones, +-- so a fresh denial submission for the same key was silently absorbed by the +-- decided row's ON CONFLICT update. Clear the key on existing decided rows +-- so future submissions can insert as new pending chunks. +UPDATE objects + SET dedup_key = NULL + WHERE object_type = 'draft_policy_chunk' + AND status IN ('approved', 'rejected') + AND dedup_key IS NOT NULL; diff --git a/crates/openshell-server/src/grpc/policy.rs b/crates/openshell-server/src/grpc/policy.rs index 2c62c930a..3f7cbf01a 100644 --- a/crates/openshell-server/src/grpc/policy.rs +++ b/crates/openshell-server/src/grpc/policy.rs @@ -53,7 +53,7 @@ use std::collections::{BTreeMap, HashMap}; use std::net::{IpAddr, Ipv4Addr}; use std::sync::Arc; use tonic::{Request, Response, Status}; -use tracing::{debug, info, warn}; +use tracing::{debug, error, info, warn}; use super::validation::{ level_matches, source_matches, validate_policy_safety, validate_static_fields_unchanged, @@ -1470,6 +1470,50 @@ pub(super) async fn handle_approve_draft_chunk( ))); } + // After issue #1245, a rejected chunk and a pending peer can coexist for + // the same (host, port, binary). Approving the rejected chunk would push + // its (possibly stale) proposed_rule into the live policy while leaving + // the newer pending peer queued — a subsequent approve of that peer then + // overwrites the rule with last-writer-wins semantics. Require the + // operator to decide the peer first so the approval reflects the latest + // observed proposal rather than an order-dependent merge. + if chunk.status == "rejected" { + if let Some(peer) = state + .store + .find_pending_draft_chunk_for_key(&sandbox_id, &chunk.host, chunk.port, &chunk.binary) + .await + .map_err(|e| Status::internal(format!("find pending peer failed: {e}")))? + { + return Err(Status::failed_precondition(format!( + "cannot approve rejected chunk {}: a pending chunk ({}) already exists for \ + the same destination ({}:{} from {}). Decide that pending chunk first.", + req.chunk_id, peer.id, chunk.host, chunk.port, chunk.binary, + ))); + } + // Another approved chunk for the same key can also coexist post-#1245. + // Approving this rejected chunk would push its (possibly stale) rule + // through `merge_chunk_into_policy`, overwriting the peer's contribution + // with last-writer-wins semantics. + if let Some(peer) = state + .store + .find_other_approved_chunk_for_key( + &sandbox_id, + &chunk.host, + chunk.port, + &chunk.binary, + &req.chunk_id, + ) + .await + .map_err(|e| Status::internal(format!("find approved peer failed: {e}")))? + { + return Err(Status::failed_precondition(format!( + "cannot approve rejected chunk {}: another approved chunk ({}) is already \ + active for the rule at {}:{} from {}. Undo or reject that chunk first.", + req.chunk_id, peer.id, chunk.host, chunk.port, chunk.binary, + ))); + } + } + info!( sandbox_id = %sandbox_id, chunk_id = %req.chunk_id, @@ -1571,6 +1615,46 @@ pub(super) async fn handle_reject_draft_chunk( if was_approved { require_no_global_policy(state).await?; + // A pending peer for the same (host, port, binary) is reachable after + // issue #1245: approving a chunk releases its dedup slot, letting a + // fresh denial surface as a new pending row. Rejecting the approved + // chunk while that peer exists would strip the rule from policy but + // leave the peer queued — approving it later silently re-installs the + // rule the operator just rejected. Require the operator to decide + // the peer first so the intent is unambiguous. + if let Some(peer) = state + .store + .find_pending_draft_chunk_for_key(&sandbox_id, &chunk.host, chunk.port, &chunk.binary) + .await + .map_err(|e| Status::internal(format!("find pending peer failed: {e}")))? + { + return Err(Status::failed_precondition(format!( + "cannot reject approved chunk {}: a pending chunk ({}) already exists for \ + the same destination ({}:{} from {}). Decide that pending chunk first.", + req.chunk_id, peer.id, chunk.host, chunk.port, chunk.binary, + ))); + } + // Another approved chunk for the same key can also coexist post-#1245. + // `remove_chunk_from_policy` strips by rule_name+binary_path, so it + // would also remove the peer's contribution. Refuse and point at it. + if let Some(peer) = state + .store + .find_other_approved_chunk_for_key( + &sandbox_id, + &chunk.host, + chunk.port, + &chunk.binary, + &req.chunk_id, + ) + .await + .map_err(|e| Status::internal(format!("find approved peer failed: {e}")))? + { + return Err(Status::failed_precondition(format!( + "cannot reject approved chunk {}: another approved chunk ({}) also \ + contributes to the rule at {}:{} from {}. Undo or reject that chunk first.", + req.chunk_id, peer.id, chunk.host, chunk.port, chunk.binary, + ))); + } let (version, hash) = remove_chunk_from_policy(state, &sandbox_id, &chunk).await?; emit_gateway_policy_audit_log( &sandbox_id, @@ -1803,6 +1887,65 @@ pub(super) async fn handle_undo_draft_chunk( ))); } + // Another approved chunk for the same (host, port, binary) can coexist + // after issue #1245 (decided chunks release the dedup slot). Since + // `remove_chunk_from_policy` strips by rule_name + binary_path rather than + // chunk identity, undoing this chunk would also strip the rule the other + // approved chunk contributed. Refuse and point the operator at the peer. + if let Some(peer) = state + .store + .find_other_approved_chunk_for_key( + &sandbox_id, + &chunk.host, + chunk.port, + &chunk.binary, + &req.chunk_id, + ) + .await + .map_err(|e| Status::internal(format!("find approved peer failed: {e}")))? + { + return Err(Status::failed_precondition(format!( + "cannot undo approved chunk {}: another approved chunk ({}) also \ + contributes to the rule at {}:{} from {}. Undo or reject that chunk first.", + req.chunk_id, peer.id, chunk.host, chunk.port, chunk.binary, + ))); + } + + // Flip the chunk back to pending before touching the active policy. A + // concurrent denial can land a pending peer at any time, and reverting an + // approved chunk to pending recomputes its dedup_key — the collision must + // be detected on the partial unique index, not via a preflight that races + // the subsequent UPDATE. If the UPDATE succeeds we proceed to mutate + // policy; if it fails with a dedup-slot conflict we surface a clean + // FailedPrecondition without having committed any policy revision (#1245). + if let Err(e) = state + .store + .update_draft_chunk_status(&req.chunk_id, "pending", None) + .await + { + if e.is_unique_violation_on("objects_dedup_uq") { + let peer_descriptor = state + .store + .find_pending_draft_chunk_for_key( + &sandbox_id, + &chunk.host, + chunk.port, + &chunk.binary, + ) + .await + .ok() + .flatten() + .map(|peer| format!(" ({})", peer.id)) + .unwrap_or_default(); + return Err(Status::failed_precondition(format!( + "cannot undo approved chunk {}: a pending chunk{} already exists for \ + the same destination ({}:{} from {}). Decide that pending chunk first.", + req.chunk_id, peer_descriptor, chunk.host, chunk.port, chunk.binary, + ))); + } + return Err(Status::internal(format!("update chunk status failed: {e}"))); + } + info!( sandbox_id = %sandbox_id, chunk_id = %req.chunk_id, @@ -1812,13 +1955,34 @@ pub(super) async fn handle_undo_draft_chunk( "UndoDraftChunk: removing rule from active policy" ); - let (version, hash) = remove_chunk_from_policy(state, &sandbox_id, &chunk).await?; - - state - .store - .update_draft_chunk_status(&req.chunk_id, "pending", None) - .await - .map_err(|e| Status::internal(format!("update chunk status failed: {e}")))?; + let (version, hash) = match remove_chunk_from_policy(state, &sandbox_id, &chunk).await { + Ok(result) => result, + Err(policy_err) => { + // The status flip already committed. If we propagate the error + // without rolling back, the chunk is stuck in `pending` while the + // rule is still live — and a subsequent reject takes the simple + // (was_approved=false) branch that never touches policy, locking + // the rule in place. Best-effort rollback to `approved` with the + // original `decided_at_ms`. Rollback can itself collide on the + // dedup index if a fresh denial raced in between the two writes; + // log loudly in that case so operators can grep for divergence. + if let Err(rollback_err) = state + .store + .update_draft_chunk_status(&req.chunk_id, "approved", chunk.decided_at_ms) + .await + { + error!( + sandbox_id = %sandbox_id, + chunk_id = %req.chunk_id, + policy_err = %policy_err, + rollback_err = %rollback_err, + "UndoDraftChunk: rule removal failed AND status rollback failed — \ + chunk left in pending state with rule still live in policy" + ); + } + return Err(policy_err); + } + }; state.sandbox_watch_bus.notify(&sandbox_id); emit_gateway_policy_audit_log( @@ -4987,4 +5151,2037 @@ mod tests { assert_eq!(err.code(), Code::InvalidArgument); assert!(err.message().contains("unknown setting key")); } + + /// A fresh denial against an existing approved rule (e.g. hostname rule + /// with stale `allowed_ips`) must surface as a new pending chunk so the + /// operator can extend the rule. Approved chunks have `dedup_key = NULL`, + /// so they don't collide with new pending submissions for the same + /// `(host, port, binary)`. See issue #1245. + #[tokio::test] + async fn denial_for_existing_approved_rule_should_surface_pending_chunk() { + use openshell_core::proto::{ + GetDraftPolicyRequest, NetworkBinary, NetworkEndpoint, SandboxPhase, SandboxSpec, + }; + + let state = test_server_state().await; + let sandbox = Sandbox { + metadata: Some(openshell_core::proto::datamodel::v1::ObjectMeta { + id: "sb-stale-allowed-ips".to_string(), + name: "stale-allowed-ips".to_string(), + created_at_ms: 1_000_000, + labels: std::collections::HashMap::new(), + }), + spec: Some(SandboxSpec { + policy: None, + ..Default::default() + }), + phase: SandboxPhase::Ready as i32, + ..Default::default() + }; + state.store.put_message(&sandbox).await.unwrap(); + let sandbox_name = sandbox.object_name().to_string(); + + let proposed_rule = NetworkPolicyRule { + name: "allow_internal_api".to_string(), + endpoints: vec![NetworkEndpoint { + host: "internal-api.example.com".to_string(), + port: 443, + allowed_ips: vec!["10.0.5.10".to_string()], + ..Default::default() + }], + binaries: vec![NetworkBinary { + path: "/usr/bin/curl".to_string(), + ..Default::default() + }], + }; + + // Step 1: first denial → mapper proposes a chunk → operator approves. + let submit = handle_submit_policy_analysis( + &state, + Request::new(SubmitPolicyAnalysisRequest { + name: sandbox_name.clone(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_internal_api".to_string(), + proposed_rule: Some(proposed_rule.clone()), + rationale: "first denial".to_string(), + confidence: 0.9, + hit_count: 1, + first_seen_ms: 100, + last_seen_ms: 100, + binary: "/usr/bin/curl".to_string(), + ..Default::default() + }], + ..Default::default() + }), + ) + .await + .unwrap() + .into_inner(); + assert_eq!(submit.accepted_chunks, 1); + + let pending = handle_get_draft_policy( + &state, + Request::new(GetDraftPolicyRequest { + name: sandbox_name.clone(), + status_filter: "pending".to_string(), + }), + ) + .await + .unwrap() + .into_inner(); + assert_eq!(pending.chunks.len(), 1); + let chunk_id = pending.chunks[0].id.clone(); + + handle_approve_draft_chunk( + &state, + Request::new(ApproveDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: chunk_id.clone(), + }), + ) + .await + .unwrap(); + + // After approve, the rule is live with `allowed_ips: ["10.0.5.10"]`. + // No pending chunks — confirmed baseline. + let pending_after_approve = handle_get_draft_policy( + &state, + Request::new(GetDraftPolicyRequest { + name: sandbox_name.clone(), + status_filter: "pending".to_string(), + }), + ) + .await + .unwrap() + .into_inner(); + assert_eq!(pending_after_approve.chunks.len(), 0); + + // Step 2: time passes, DNS for internal-api.example.com flips to a + // different backend (e.g. 10.0.5.99), the proxy denies the connection + // because the resolved IP is not in `allowed_ips`, and the mechanistic + // mapper generates a fresh proposal for the SAME (host, port, binary). + // The proposal here would carry the *new* allowed_ips it observed. + let stale_proposed_rule = NetworkPolicyRule { + name: "allow_internal_api".to_string(), + endpoints: vec![NetworkEndpoint { + host: "internal-api.example.com".to_string(), + port: 443, + allowed_ips: vec!["10.0.5.99".to_string()], + ..Default::default() + }], + binaries: vec![NetworkBinary { + path: "/usr/bin/curl".to_string(), + ..Default::default() + }], + }; + + handle_submit_policy_analysis( + &state, + Request::new(SubmitPolicyAnalysisRequest { + name: sandbox_name.clone(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_internal_api".to_string(), + proposed_rule: Some(stale_proposed_rule), + rationale: "denial after VPN flip — resolved IP not in allowed_ips".to_string(), + confidence: 0.9, + hit_count: 1, + first_seen_ms: 200, + last_seen_ms: 200, + binary: "/usr/bin/curl".to_string(), + ..Default::default() + }], + ..Default::default() + }), + ) + .await + .unwrap(); + + // Step 3: operator opens the TUI / queries draft policy expecting to + // see something to act on. Today, this assertion fails: zero pending + // chunks — the dedup `ON CONFLICT` only bumped hit_count on the + // already-approved chunk, so the TUI's pending-rule surface stays + // silent and the operator has no in-product signal. + let pending_after_second_denial = handle_get_draft_policy( + &state, + Request::new(GetDraftPolicyRequest { + name: sandbox_name, + status_filter: "pending".to_string(), + }), + ) + .await + .unwrap() + .into_inner(); + assert!( + !pending_after_second_denial.chunks.is_empty(), + "expected a pending chunk to surface the stale-allowed_ips denial, \ + but got zero — operator has no in-product signal" + ); + // The fix isn't just "a chunk surfaces" — the surfaced chunk must + // carry the *new* allowed_ips so the operator can extend the rule. + let surfaced_rule = pending_after_second_denial.chunks[0] + .proposed_rule + .as_ref() + .expect("surfaced chunk must include a proposed rule"); + let surfaced_allowed_ips: Vec<&str> = surfaced_rule.endpoints[0] + .allowed_ips + .iter() + .map(String::as_str) + .collect(); + assert_eq!( + surfaced_allowed_ips, + vec!["10.0.5.99"], + "surfaced pending chunk must reflect the new resolved IP, not the original" + ); + } + + /// Issue #1245 follow-up: once a fresh denial against an existing approved + /// rule surfaces as a pending peer, undoing the approved chunk would + /// recompute the same `dedup_key` and collide on the partial unique index. + /// The handler must reject the undo with `FailedPrecondition` BEFORE + /// touching the active policy, leaving DB and policy state consistent. + #[tokio::test] + async fn undo_approved_chunk_with_competing_pending_peer_returns_failed_precondition() { + use openshell_core::proto::{ + GetDraftPolicyRequest, NetworkBinary, NetworkEndpoint, SandboxPhase, SandboxSpec, + }; + + let state = test_server_state().await; + let sandbox = Sandbox { + metadata: Some(openshell_core::proto::datamodel::v1::ObjectMeta { + id: "sb-undo-collision".to_string(), + name: "undo-collision".to_string(), + created_at_ms: 1_000_000, + labels: std::collections::HashMap::new(), + }), + spec: Some(SandboxSpec { + policy: None, + ..Default::default() + }), + phase: SandboxPhase::Ready as i32, + ..Default::default() + }; + state.store.put_message(&sandbox).await.unwrap(); + let sandbox_name = sandbox.object_name().to_string(); + let sandbox_id = sandbox.object_id().to_string(); + + let make_rule = |ip: &str| NetworkPolicyRule { + name: "allow_internal_api".to_string(), + endpoints: vec![NetworkEndpoint { + host: "internal-api.example.com".to_string(), + port: 443, + allowed_ips: vec![ip.to_string()], + ..Default::default() + }], + binaries: vec![NetworkBinary { + path: "/usr/bin/curl".to_string(), + ..Default::default() + }], + }; + + // Submit + approve the first denial. + handle_submit_policy_analysis( + &state, + Request::new(SubmitPolicyAnalysisRequest { + name: sandbox_name.clone(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_internal_api".to_string(), + proposed_rule: Some(make_rule("10.0.5.10")), + rationale: "first denial".to_string(), + confidence: 0.9, + hit_count: 1, + first_seen_ms: 100, + last_seen_ms: 100, + binary: "/usr/bin/curl".to_string(), + ..Default::default() + }], + ..Default::default() + }), + ) + .await + .unwrap(); + + let approved_chunk_id = handle_get_draft_policy( + &state, + Request::new(GetDraftPolicyRequest { + name: sandbox_name.clone(), + status_filter: "pending".to_string(), + }), + ) + .await + .unwrap() + .into_inner() + .chunks[0] + .id + .clone(); + + handle_approve_draft_chunk( + &state, + Request::new(ApproveDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: approved_chunk_id.clone(), + }), + ) + .await + .unwrap(); + + // A fresh denial creates a pending peer for the same (host, port, binary). + handle_submit_policy_analysis( + &state, + Request::new(SubmitPolicyAnalysisRequest { + name: sandbox_name.clone(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_internal_api".to_string(), + proposed_rule: Some(make_rule("10.0.5.99")), + rationale: "denial after VPN flip".to_string(), + confidence: 0.9, + hit_count: 1, + first_seen_ms: 200, + last_seen_ms: 200, + binary: "/usr/bin/curl".to_string(), + ..Default::default() + }], + ..Default::default() + }), + ) + .await + .unwrap(); + + let pending_after_resubmit = handle_get_draft_policy( + &state, + Request::new(GetDraftPolicyRequest { + name: sandbox_name.clone(), + status_filter: "pending".to_string(), + }), + ) + .await + .unwrap() + .into_inner(); + assert_eq!(pending_after_resubmit.chunks.len(), 1); + let peer_chunk_id = pending_after_resubmit.chunks[0].id.clone(); + assert_ne!(peer_chunk_id, approved_chunk_id); + + let revisions_before_undo = handle_list_sandbox_policies( + &state, + Request::new(ListSandboxPoliciesRequest { + name: sandbox_name.clone(), + limit: 10, + offset: 0, + global: false, + }), + ) + .await + .unwrap() + .into_inner(); + let live_version_before = revisions_before_undo.revisions[0].version; + + // Undo must refuse before mutating policy. + let undo_err = handle_undo_draft_chunk( + &state, + Request::new(UndoDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: approved_chunk_id.clone(), + }), + ) + .await + .unwrap_err(); + assert_eq!(undo_err.code(), Code::FailedPrecondition); + assert!( + undo_err.message().contains(&peer_chunk_id), + "error message should reference the conflicting pending chunk; got: {}", + undo_err.message(), + ); + + // The approved chunk is still approved; the pending peer is still pending. + let approved_after = state + .store + .get_draft_chunk(&approved_chunk_id) + .await + .unwrap() + .unwrap(); + assert_eq!(approved_after.status, "approved"); + let peer_after = state + .store + .get_draft_chunk(&peer_chunk_id) + .await + .unwrap() + .unwrap(); + assert_eq!(peer_after.status, "pending"); + + // No new policy revision was created. + let revisions_after_undo = handle_list_sandbox_policies( + &state, + Request::new(ListSandboxPoliciesRequest { + name: sandbox_name.clone(), + limit: 10, + offset: 0, + global: false, + }), + ) + .await + .unwrap() + .into_inner(); + assert_eq!( + revisions_after_undo.revisions[0].version, live_version_before, + "undo must not produce a new policy revision when it fails on the dedup slot" + ); + let _ = sandbox_id; + } + + /// The recovery path documented in the precondition error message + /// ("decide that pending chunk first") must actually unblock the undo. + /// Without this test, a future regression in `update_draft_chunk_status` + /// or in dedup-key recomputation could silently break operator recovery + /// while the collision test above still passes. + #[tokio::test] + async fn undo_succeeds_after_deciding_the_competing_pending_peer() { + use openshell_core::proto::{ + GetDraftPolicyRequest, NetworkBinary, NetworkEndpoint, RejectDraftChunkRequest, + SandboxPhase, SandboxSpec, + }; + + let state = test_server_state().await; + let sandbox = Sandbox { + metadata: Some(openshell_core::proto::datamodel::v1::ObjectMeta { + id: "sb-undo-recovery".to_string(), + name: "undo-recovery".to_string(), + created_at_ms: 1_000_000, + labels: std::collections::HashMap::new(), + }), + spec: Some(SandboxSpec { + policy: None, + ..Default::default() + }), + phase: SandboxPhase::Ready as i32, + ..Default::default() + }; + state.store.put_message(&sandbox).await.unwrap(); + let sandbox_name = sandbox.object_name().to_string(); + + let make_rule = |ip: &str| NetworkPolicyRule { + name: "allow_internal_api".to_string(), + endpoints: vec![NetworkEndpoint { + host: "internal-api.example.com".to_string(), + port: 443, + allowed_ips: vec![ip.to_string()], + ..Default::default() + }], + binaries: vec![NetworkBinary { + path: "/usr/bin/curl".to_string(), + ..Default::default() + }], + }; + + handle_submit_policy_analysis( + &state, + Request::new(SubmitPolicyAnalysisRequest { + name: sandbox_name.clone(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_internal_api".to_string(), + proposed_rule: Some(make_rule("10.0.5.10")), + rationale: "first denial".to_string(), + confidence: 0.9, + hit_count: 1, + first_seen_ms: 100, + last_seen_ms: 100, + binary: "/usr/bin/curl".to_string(), + ..Default::default() + }], + ..Default::default() + }), + ) + .await + .unwrap(); + let approved_chunk_id = handle_get_draft_policy( + &state, + Request::new(GetDraftPolicyRequest { + name: sandbox_name.clone(), + status_filter: "pending".to_string(), + }), + ) + .await + .unwrap() + .into_inner() + .chunks[0] + .id + .clone(); + handle_approve_draft_chunk( + &state, + Request::new(ApproveDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: approved_chunk_id.clone(), + }), + ) + .await + .unwrap(); + + handle_submit_policy_analysis( + &state, + Request::new(SubmitPolicyAnalysisRequest { + name: sandbox_name.clone(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_internal_api".to_string(), + proposed_rule: Some(make_rule("10.0.5.99")), + rationale: "fresh denial".to_string(), + confidence: 0.9, + hit_count: 1, + first_seen_ms: 200, + last_seen_ms: 200, + binary: "/usr/bin/curl".to_string(), + ..Default::default() + }], + ..Default::default() + }), + ) + .await + .unwrap(); + let peer_chunk_id = handle_get_draft_policy( + &state, + Request::new(GetDraftPolicyRequest { + name: sandbox_name.clone(), + status_filter: "pending".to_string(), + }), + ) + .await + .unwrap() + .into_inner() + .chunks[0] + .id + .clone(); + + // First undo blocks on the peer. + handle_undo_draft_chunk( + &state, + Request::new(UndoDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: approved_chunk_id.clone(), + }), + ) + .await + .unwrap_err(); + + // Operator decides the peer (reject), then retries the undo. + handle_reject_draft_chunk( + &state, + Request::new(RejectDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: peer_chunk_id.clone(), + reason: "superseded by undo".to_string(), + }), + ) + .await + .unwrap(); + + let undo = handle_undo_draft_chunk( + &state, + Request::new(UndoDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: approved_chunk_id.clone(), + }), + ) + .await + .unwrap() + .into_inner(); + assert!(undo.policy_version > 0); + assert!(!undo.policy_hash.is_empty()); + + let approved_after = state + .store + .get_draft_chunk(&approved_chunk_id) + .await + .unwrap() + .unwrap(); + assert_eq!( + approved_after.status, "pending", + "after recovery, the previously approved chunk must be pending" + ); + } + + /// Rejecting an approved chunk while a pending peer exists for the same + /// (host, port, binary) must refuse without touching the live policy. The + /// peer represents the operator's newer signal; reject silently stripping + /// the rule would create a confusing approval loop. + #[tokio::test] + async fn reject_approved_chunk_with_pending_peer_returns_failed_precondition() { + use openshell_core::proto::{ + GetDraftPolicyRequest, NetworkBinary, NetworkEndpoint, RejectDraftChunkRequest, + SandboxPhase, SandboxSpec, + }; + + let state = test_server_state().await; + let sandbox = Sandbox { + metadata: Some(openshell_core::proto::datamodel::v1::ObjectMeta { + id: "sb-reject-peer".to_string(), + name: "reject-peer".to_string(), + created_at_ms: 1_000_000, + labels: std::collections::HashMap::new(), + }), + spec: Some(SandboxSpec { + policy: None, + ..Default::default() + }), + phase: SandboxPhase::Ready as i32, + ..Default::default() + }; + state.store.put_message(&sandbox).await.unwrap(); + let sandbox_name = sandbox.object_name().to_string(); + + let make_rule = |ip: &str| NetworkPolicyRule { + name: "allow_internal_api".to_string(), + endpoints: vec![NetworkEndpoint { + host: "internal-api.example.com".to_string(), + port: 443, + allowed_ips: vec![ip.to_string()], + ..Default::default() + }], + binaries: vec![NetworkBinary { + path: "/usr/bin/curl".to_string(), + ..Default::default() + }], + }; + + handle_submit_policy_analysis( + &state, + Request::new(SubmitPolicyAnalysisRequest { + name: sandbox_name.clone(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_internal_api".to_string(), + proposed_rule: Some(make_rule("10.0.5.10")), + rationale: "first denial".to_string(), + confidence: 0.9, + hit_count: 1, + first_seen_ms: 100, + last_seen_ms: 100, + binary: "/usr/bin/curl".to_string(), + ..Default::default() + }], + ..Default::default() + }), + ) + .await + .unwrap(); + let approved_chunk_id = handle_get_draft_policy( + &state, + Request::new(GetDraftPolicyRequest { + name: sandbox_name.clone(), + status_filter: "pending".to_string(), + }), + ) + .await + .unwrap() + .into_inner() + .chunks[0] + .id + .clone(); + handle_approve_draft_chunk( + &state, + Request::new(ApproveDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: approved_chunk_id.clone(), + }), + ) + .await + .unwrap(); + + handle_submit_policy_analysis( + &state, + Request::new(SubmitPolicyAnalysisRequest { + name: sandbox_name.clone(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_internal_api".to_string(), + proposed_rule: Some(make_rule("10.0.5.99")), + rationale: "fresh denial".to_string(), + confidence: 0.9, + hit_count: 1, + first_seen_ms: 200, + last_seen_ms: 200, + binary: "/usr/bin/curl".to_string(), + ..Default::default() + }], + ..Default::default() + }), + ) + .await + .unwrap(); + let peer_chunk_id = handle_get_draft_policy( + &state, + Request::new(GetDraftPolicyRequest { + name: sandbox_name.clone(), + status_filter: "pending".to_string(), + }), + ) + .await + .unwrap() + .into_inner() + .chunks[0] + .id + .clone(); + + let revisions_before = handle_list_sandbox_policies( + &state, + Request::new(ListSandboxPoliciesRequest { + name: sandbox_name.clone(), + limit: 10, + offset: 0, + global: false, + }), + ) + .await + .unwrap() + .into_inner(); + let version_before = revisions_before.revisions[0].version; + + let reject_err = handle_reject_draft_chunk( + &state, + Request::new(RejectDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: approved_chunk_id.clone(), + reason: "operator wants to revoke".to_string(), + }), + ) + .await + .unwrap_err(); + assert_eq!(reject_err.code(), Code::FailedPrecondition); + assert!(reject_err.message().contains(&peer_chunk_id)); + + let approved_after = state + .store + .get_draft_chunk(&approved_chunk_id) + .await + .unwrap() + .unwrap(); + assert_eq!(approved_after.status, "approved"); + + let revisions_after = handle_list_sandbox_policies( + &state, + Request::new(ListSandboxPoliciesRequest { + name: sandbox_name.clone(), + limit: 10, + offset: 0, + global: false, + }), + ) + .await + .unwrap() + .into_inner(); + assert_eq!(revisions_after.revisions[0].version, version_before); + } + + /// Approving a rejected chunk while a pending peer carries a newer + /// `proposed_rule` would push stale `allowed_ips` into the policy and + /// invite an order-dependent overwrite when the peer is later approved. + /// Refuse instead. + #[tokio::test] + async fn approve_rejected_chunk_with_pending_peer_returns_failed_precondition() { + use openshell_core::proto::{ + GetDraftPolicyRequest, NetworkBinary, NetworkEndpoint, RejectDraftChunkRequest, + SandboxPhase, SandboxSpec, + }; + + let state = test_server_state().await; + let sandbox = Sandbox { + metadata: Some(openshell_core::proto::datamodel::v1::ObjectMeta { + id: "sb-approve-rejected-peer".to_string(), + name: "approve-rejected-peer".to_string(), + created_at_ms: 1_000_000, + labels: std::collections::HashMap::new(), + }), + spec: Some(SandboxSpec { + policy: None, + ..Default::default() + }), + phase: SandboxPhase::Ready as i32, + ..Default::default() + }; + state.store.put_message(&sandbox).await.unwrap(); + let sandbox_name = sandbox.object_name().to_string(); + + let make_rule = |ip: &str| NetworkPolicyRule { + name: "allow_internal_api".to_string(), + endpoints: vec![NetworkEndpoint { + host: "internal-api.example.com".to_string(), + port: 443, + allowed_ips: vec![ip.to_string()], + ..Default::default() + }], + binaries: vec![NetworkBinary { + path: "/usr/bin/curl".to_string(), + ..Default::default() + }], + }; + + // Seed a pending chunk, reject it (releasing its dedup slot). + handle_submit_policy_analysis( + &state, + Request::new(SubmitPolicyAnalysisRequest { + name: sandbox_name.clone(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_internal_api".to_string(), + proposed_rule: Some(make_rule("10.0.5.10")), + rationale: "first denial".to_string(), + confidence: 0.9, + hit_count: 1, + first_seen_ms: 100, + last_seen_ms: 100, + binary: "/usr/bin/curl".to_string(), + ..Default::default() + }], + ..Default::default() + }), + ) + .await + .unwrap(); + let rejected_chunk_id = handle_get_draft_policy( + &state, + Request::new(GetDraftPolicyRequest { + name: sandbox_name.clone(), + status_filter: "pending".to_string(), + }), + ) + .await + .unwrap() + .into_inner() + .chunks[0] + .id + .clone(); + handle_reject_draft_chunk( + &state, + Request::new(RejectDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: rejected_chunk_id.clone(), + reason: "noisy first attempt".to_string(), + }), + ) + .await + .unwrap(); + + // Fresh denial creates a pending peer. + handle_submit_policy_analysis( + &state, + Request::new(SubmitPolicyAnalysisRequest { + name: sandbox_name.clone(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_internal_api".to_string(), + proposed_rule: Some(make_rule("10.0.5.99")), + rationale: "fresh denial".to_string(), + confidence: 0.9, + hit_count: 1, + first_seen_ms: 200, + last_seen_ms: 200, + binary: "/usr/bin/curl".to_string(), + ..Default::default() + }], + ..Default::default() + }), + ) + .await + .unwrap(); + let peer_chunk_id = handle_get_draft_policy( + &state, + Request::new(GetDraftPolicyRequest { + name: sandbox_name.clone(), + status_filter: "pending".to_string(), + }), + ) + .await + .unwrap() + .into_inner() + .chunks[0] + .id + .clone(); + + let approve_err = handle_approve_draft_chunk( + &state, + Request::new(ApproveDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: rejected_chunk_id.clone(), + }), + ) + .await + .unwrap_err(); + assert_eq!(approve_err.code(), Code::FailedPrecondition); + assert!(approve_err.message().contains(&peer_chunk_id)); + + let rejected_after = state + .store + .get_draft_chunk(&rejected_chunk_id) + .await + .unwrap() + .unwrap(); + assert_eq!(rejected_after.status, "rejected"); + } + + /// Codex round-3 finding: `dedup_key=NULL` on decided chunks lets two + /// `approved` rows coexist for the same `(host, port, binary)`. The + /// pending-peer preflight does not detect this case — undo of the older + /// approved chunk would silently strip the rule that the newer approved + /// chunk installed (because `remove_chunk_from_policy` operates by + /// `rule_name` + `binary_path`, not chunk identity). + #[tokio::test] + async fn undo_approved_chunk_with_other_approved_peer_returns_failed_precondition() { + use openshell_core::proto::{ + GetDraftPolicyRequest, NetworkBinary, NetworkEndpoint, SandboxPhase, SandboxSpec, + }; + + let state = test_server_state().await; + let sandbox = Sandbox { + metadata: Some(openshell_core::proto::datamodel::v1::ObjectMeta { + id: "sb-decided-peer".to_string(), + name: "decided-peer".to_string(), + created_at_ms: 1_000_000, + labels: std::collections::HashMap::new(), + }), + spec: Some(SandboxSpec { + policy: None, + ..Default::default() + }), + phase: SandboxPhase::Ready as i32, + ..Default::default() + }; + state.store.put_message(&sandbox).await.unwrap(); + let sandbox_name = sandbox.object_name().to_string(); + let sandbox_id = sandbox.object_id().to_string(); + + let make_rule = |ip: &str| NetworkPolicyRule { + name: "allow_internal_api".to_string(), + endpoints: vec![NetworkEndpoint { + host: "internal-api.example.com".to_string(), + port: 443, + allowed_ips: vec![ip.to_string()], + ..Default::default() + }], + binaries: vec![NetworkBinary { + path: "/usr/bin/curl".to_string(), + ..Default::default() + }], + }; + + // Approve A — rule R installed with 10.0.5.10. + handle_submit_policy_analysis( + &state, + Request::new(SubmitPolicyAnalysisRequest { + name: sandbox_name.clone(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_internal_api".to_string(), + proposed_rule: Some(make_rule("10.0.5.10")), + rationale: "first denial".to_string(), + confidence: 0.9, + hit_count: 1, + first_seen_ms: 100, + last_seen_ms: 100, + binary: "/usr/bin/curl".to_string(), + ..Default::default() + }], + ..Default::default() + }), + ) + .await + .unwrap(); + let first_approved_id = handle_get_draft_policy( + &state, + Request::new(GetDraftPolicyRequest { + name: sandbox_name.clone(), + status_filter: "pending".to_string(), + }), + ) + .await + .unwrap() + .into_inner() + .chunks[0] + .id + .clone(); + handle_approve_draft_chunk( + &state, + Request::new(ApproveDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: first_approved_id.clone(), + }), + ) + .await + .unwrap(); + + // Fresh denial → pending B. + handle_submit_policy_analysis( + &state, + Request::new(SubmitPolicyAnalysisRequest { + name: sandbox_name.clone(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_internal_api".to_string(), + proposed_rule: Some(make_rule("10.0.5.99")), + rationale: "fresh denial".to_string(), + confidence: 0.9, + hit_count: 1, + first_seen_ms: 200, + last_seen_ms: 200, + binary: "/usr/bin/curl".to_string(), + ..Default::default() + }], + ..Default::default() + }), + ) + .await + .unwrap(); + let second_approved_id = handle_get_draft_policy( + &state, + Request::new(GetDraftPolicyRequest { + name: sandbox_name.clone(), + status_filter: "pending".to_string(), + }), + ) + .await + .unwrap() + .into_inner() + .chunks[0] + .id + .clone(); + + // Approve B — this is the #1245 workflow (extend rule). Must succeed. + handle_approve_draft_chunk( + &state, + Request::new(ApproveDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: second_approved_id.clone(), + }), + ) + .await + .unwrap(); + + // Both A and B are now approved; rule R reflects the merged state. + let chunk_a = state + .store + .get_draft_chunk(&first_approved_id) + .await + .unwrap() + .unwrap(); + let chunk_b = state + .store + .get_draft_chunk(&second_approved_id) + .await + .unwrap() + .unwrap(); + assert_eq!(chunk_a.status, "approved"); + assert_eq!(chunk_b.status, "approved"); + + let revisions_before = handle_list_sandbox_policies( + &state, + Request::new(ListSandboxPoliciesRequest { + name: sandbox_name.clone(), + limit: 10, + offset: 0, + global: false, + }), + ) + .await + .unwrap() + .into_inner(); + let version_before = revisions_before.revisions[0].version; + + // Undo A must refuse: undoing it would strip the rule B installed. + let undo_err = handle_undo_draft_chunk( + &state, + Request::new(UndoDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: first_approved_id.clone(), + }), + ) + .await + .unwrap_err(); + assert_eq!(undo_err.code(), Code::FailedPrecondition); + assert!( + undo_err.message().contains(&second_approved_id), + "error must name the conflicting approved chunk; got: {}", + undo_err.message(), + ); + + // A must still be approved; no new policy revision. + let chunk_a_after = state + .store + .get_draft_chunk(&first_approved_id) + .await + .unwrap() + .unwrap(); + assert_eq!(chunk_a_after.status, "approved"); + + let revisions_after = handle_list_sandbox_policies( + &state, + Request::new(ListSandboxPoliciesRequest { + name: sandbox_name.clone(), + limit: 10, + offset: 0, + global: false, + }), + ) + .await + .unwrap() + .into_inner(); + assert_eq!(revisions_after.revisions[0].version, version_before); + let _ = sandbox_id; + } + + /// Sibling of `undo_approved_chunk_with_other_approved_peer_...` for the + /// approve path: approving a *rejected* chunk while another approved + /// chunk owns the same `(host, port, binary)` would push the rejected + /// chunk's (possibly stale) rule body through `merge_chunk_into_policy`, + /// overwriting the peer's contribution with last-writer-wins semantics. + /// The handler must refuse and name the conflicting peer. + #[tokio::test] + async fn approve_rejected_chunk_with_other_approved_peer_returns_failed_precondition() { + use openshell_core::proto::{ + GetDraftPolicyRequest, NetworkBinary, NetworkEndpoint, SandboxPhase, SandboxSpec, + }; + + let state = test_server_state().await; + let sandbox = Sandbox { + metadata: Some(openshell_core::proto::datamodel::v1::ObjectMeta { + id: "sb-approve-peer".to_string(), + name: "approve-peer".to_string(), + created_at_ms: 1_000_000, + labels: std::collections::HashMap::new(), + }), + spec: Some(SandboxSpec { + policy: None, + ..Default::default() + }), + phase: SandboxPhase::Ready as i32, + ..Default::default() + }; + state.store.put_message(&sandbox).await.unwrap(); + let sandbox_name = sandbox.object_name().to_string(); + + let make_rule = |ip: &str| NetworkPolicyRule { + name: "allow_internal_api".to_string(), + endpoints: vec![NetworkEndpoint { + host: "internal-api.example.com".to_string(), + port: 443, + allowed_ips: vec![ip.to_string()], + ..Default::default() + }], + binaries: vec![NetworkBinary { + path: "/usr/bin/curl".to_string(), + ..Default::default() + }], + }; + + // Approve A — rule R installed with 10.0.5.10. + handle_submit_policy_analysis( + &state, + Request::new(SubmitPolicyAnalysisRequest { + name: sandbox_name.clone(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_internal_api".to_string(), + proposed_rule: Some(make_rule("10.0.5.10")), + rationale: "first denial".to_string(), + confidence: 0.9, + hit_count: 1, + first_seen_ms: 100, + last_seen_ms: 100, + binary: "/usr/bin/curl".to_string(), + ..Default::default() + }], + ..Default::default() + }), + ) + .await + .unwrap(); + let approved_id = handle_get_draft_policy( + &state, + Request::new(GetDraftPolicyRequest { + name: sandbox_name.clone(), + status_filter: "pending".to_string(), + }), + ) + .await + .unwrap() + .into_inner() + .chunks[0] + .id + .clone(); + handle_approve_draft_chunk( + &state, + Request::new(ApproveDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: approved_id.clone(), + }), + ) + .await + .unwrap(); + + // Fresh denial → pending B for the same key. + handle_submit_policy_analysis( + &state, + Request::new(SubmitPolicyAnalysisRequest { + name: sandbox_name.clone(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_internal_api".to_string(), + proposed_rule: Some(make_rule("10.0.5.99")), + rationale: "fresh denial".to_string(), + confidence: 0.9, + hit_count: 1, + first_seen_ms: 200, + last_seen_ms: 200, + binary: "/usr/bin/curl".to_string(), + ..Default::default() + }], + ..Default::default() + }), + ) + .await + .unwrap(); + let rejected_id = handle_get_draft_policy( + &state, + Request::new(GetDraftPolicyRequest { + name: sandbox_name.clone(), + status_filter: "pending".to_string(), + }), + ) + .await + .unwrap() + .into_inner() + .chunks[0] + .id + .clone(); + + // Reject B while still pending — this skips the approved-peer check + // (was_approved == false) and leaves B in `rejected` state while A + // is still approved. + handle_reject_draft_chunk( + &state, + Request::new(RejectDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: rejected_id.clone(), + reason: "discarded".to_string(), + }), + ) + .await + .unwrap(); + let rejected_b = state + .store + .get_draft_chunk(&rejected_id) + .await + .unwrap() + .unwrap(); + assert_eq!(rejected_b.status, "rejected"); + + let revisions_before = handle_list_sandbox_policies( + &state, + Request::new(ListSandboxPoliciesRequest { + name: sandbox_name.clone(), + limit: 10, + offset: 0, + global: false, + }), + ) + .await + .unwrap() + .into_inner(); + let version_before = revisions_before.revisions[0].version; + + // Re-approve B (currently rejected) — must refuse: A's approved rule + // owns this key, and merging B would silently overwrite it. + let approve_err = handle_approve_draft_chunk( + &state, + Request::new(ApproveDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: rejected_id.clone(), + }), + ) + .await + .unwrap_err(); + assert_eq!(approve_err.code(), Code::FailedPrecondition); + assert!( + approve_err.message().contains(&approved_id), + "error must name the conflicting approved chunk; got: {}", + approve_err.message(), + ); + + // B must still be rejected; no new policy revision. + let rejected_after = state + .store + .get_draft_chunk(&rejected_id) + .await + .unwrap() + .unwrap(); + assert_eq!(rejected_after.status, "rejected"); + + let revisions_after = handle_list_sandbox_policies( + &state, + Request::new(ListSandboxPoliciesRequest { + name: sandbox_name.clone(), + limit: 10, + offset: 0, + global: false, + }), + ) + .await + .unwrap() + .into_inner(); + assert_eq!(revisions_after.revisions[0].version, version_before); + } + + /// Sibling of `undo_approved_chunk_with_other_approved_peer_...` for the + /// reject path: rejecting an approved chunk while another approved chunk + /// also contributes to the rule would call `remove_chunk_from_policy` + /// (which strips by `rule_name` + `binary_path`), stripping the peer's + /// contribution too. The handler must refuse and name the peer. + #[tokio::test] + async fn reject_approved_chunk_with_other_approved_peer_returns_failed_precondition() { + use openshell_core::proto::{ + GetDraftPolicyRequest, NetworkBinary, NetworkEndpoint, SandboxPhase, SandboxSpec, + }; + + let state = test_server_state().await; + let sandbox = Sandbox { + metadata: Some(openshell_core::proto::datamodel::v1::ObjectMeta { + id: "sb-reject-peer".to_string(), + name: "reject-peer".to_string(), + created_at_ms: 1_000_000, + labels: std::collections::HashMap::new(), + }), + spec: Some(SandboxSpec { + policy: None, + ..Default::default() + }), + phase: SandboxPhase::Ready as i32, + ..Default::default() + }; + state.store.put_message(&sandbox).await.unwrap(); + let sandbox_name = sandbox.object_name().to_string(); + + let make_rule = |ip: &str| NetworkPolicyRule { + name: "allow_internal_api".to_string(), + endpoints: vec![NetworkEndpoint { + host: "internal-api.example.com".to_string(), + port: 443, + allowed_ips: vec![ip.to_string()], + ..Default::default() + }], + binaries: vec![NetworkBinary { + path: "/usr/bin/curl".to_string(), + ..Default::default() + }], + }; + + // Approve A. + handle_submit_policy_analysis( + &state, + Request::new(SubmitPolicyAnalysisRequest { + name: sandbox_name.clone(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_internal_api".to_string(), + proposed_rule: Some(make_rule("10.0.5.10")), + rationale: "first denial".to_string(), + confidence: 0.9, + hit_count: 1, + first_seen_ms: 100, + last_seen_ms: 100, + binary: "/usr/bin/curl".to_string(), + ..Default::default() + }], + ..Default::default() + }), + ) + .await + .unwrap(); + let first_approved_id = handle_get_draft_policy( + &state, + Request::new(GetDraftPolicyRequest { + name: sandbox_name.clone(), + status_filter: "pending".to_string(), + }), + ) + .await + .unwrap() + .into_inner() + .chunks[0] + .id + .clone(); + handle_approve_draft_chunk( + &state, + Request::new(ApproveDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: first_approved_id.clone(), + }), + ) + .await + .unwrap(); + + // Fresh denial → pending B. Approve B (rule extended; both approved). + handle_submit_policy_analysis( + &state, + Request::new(SubmitPolicyAnalysisRequest { + name: sandbox_name.clone(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_internal_api".to_string(), + proposed_rule: Some(make_rule("10.0.5.99")), + rationale: "fresh denial".to_string(), + confidence: 0.9, + hit_count: 1, + first_seen_ms: 200, + last_seen_ms: 200, + binary: "/usr/bin/curl".to_string(), + ..Default::default() + }], + ..Default::default() + }), + ) + .await + .unwrap(); + let second_approved_id = handle_get_draft_policy( + &state, + Request::new(GetDraftPolicyRequest { + name: sandbox_name.clone(), + status_filter: "pending".to_string(), + }), + ) + .await + .unwrap() + .into_inner() + .chunks[0] + .id + .clone(); + handle_approve_draft_chunk( + &state, + Request::new(ApproveDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: second_approved_id.clone(), + }), + ) + .await + .unwrap(); + + let revisions_before = handle_list_sandbox_policies( + &state, + Request::new(ListSandboxPoliciesRequest { + name: sandbox_name.clone(), + limit: 10, + offset: 0, + global: false, + }), + ) + .await + .unwrap() + .into_inner(); + let version_before = revisions_before.revisions[0].version; + + // Reject A must refuse: rejecting it calls `remove_chunk_from_policy`, + // which strips by rule_name + binary_path and would remove B's + // contribution too. + let reject_err = handle_reject_draft_chunk( + &state, + Request::new(RejectDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: first_approved_id.clone(), + reason: "operator change of mind".to_string(), + }), + ) + .await + .unwrap_err(); + assert_eq!(reject_err.code(), Code::FailedPrecondition); + assert!( + reject_err.message().contains(&second_approved_id), + "error must name the conflicting approved chunk; got: {}", + reject_err.message(), + ); + + // A must still be approved; no new policy revision. + let chunk_a_after = state + .store + .get_draft_chunk(&first_approved_id) + .await + .unwrap() + .unwrap(); + assert_eq!(chunk_a_after.status, "approved"); + + let revisions_after = handle_list_sandbox_policies( + &state, + Request::new(ListSandboxPoliciesRequest { + name: sandbox_name.clone(), + limit: 10, + offset: 0, + global: false, + }), + ) + .await + .unwrap() + .into_inner(); + assert_eq!(revisions_after.revisions[0].version, version_before); + } + + /// Round-3 finding: undo flips the chunk's status to `pending` BEFORE + /// calling `remove_chunk_from_policy`, so that a dedup-slot collision can + /// be detected on the partial unique index. If the policy mutation then + /// fails (DB error, version-retry exhaustion, validation), the chunk is + /// left in `pending` state while the rule is still live — and the + /// operator's natural recovery move (reject the pending chunk) hits the + /// `was_approved == false` branch that never calls + /// `remove_chunk_from_policy`, locking the rule in place permanently. + /// The handler must compensate by rolling the status back to `approved` + /// on remove failure. + #[tokio::test] + async fn undo_rolls_back_status_when_remove_chunk_from_policy_fails() { + use crate::persistence::Store; + use openshell_core::proto::{ + GetDraftPolicyRequest, NetworkBinary, NetworkEndpoint, SandboxPhase, SandboxSpec, + }; + + let state = test_server_state().await; + let sandbox = Sandbox { + metadata: Some(openshell_core::proto::datamodel::v1::ObjectMeta { + id: "sb-orphan-rule".to_string(), + name: "orphan-rule".to_string(), + created_at_ms: 1_000_000, + labels: std::collections::HashMap::new(), + }), + spec: Some(SandboxSpec { + policy: None, + ..Default::default() + }), + phase: SandboxPhase::Ready as i32, + ..Default::default() + }; + state.store.put_message(&sandbox).await.unwrap(); + let sandbox_name = sandbox.object_name().to_string(); + + let rule = NetworkPolicyRule { + name: "allow_internal_api".to_string(), + endpoints: vec![NetworkEndpoint { + host: "internal-api.example.com".to_string(), + port: 443, + allowed_ips: vec!["10.0.5.10".to_string()], + ..Default::default() + }], + binaries: vec![NetworkBinary { + path: "/usr/bin/curl".to_string(), + ..Default::default() + }], + }; + + handle_submit_policy_analysis( + &state, + Request::new(SubmitPolicyAnalysisRequest { + name: sandbox_name.clone(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_internal_api".to_string(), + proposed_rule: Some(rule.clone()), + rationale: "first denial".to_string(), + confidence: 0.9, + hit_count: 1, + first_seen_ms: 100, + last_seen_ms: 100, + binary: "/usr/bin/curl".to_string(), + ..Default::default() + }], + ..Default::default() + }), + ) + .await + .unwrap(); + let chunk_id = handle_get_draft_policy( + &state, + Request::new(GetDraftPolicyRequest { + name: sandbox_name.clone(), + status_filter: "pending".to_string(), + }), + ) + .await + .unwrap() + .into_inner() + .chunks[0] + .id + .clone(); + let approve_response = handle_approve_draft_chunk( + &state, + Request::new(ApproveDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: chunk_id.clone(), + }), + ) + .await + .unwrap() + .into_inner(); + let version_after_approve = approve_response.policy_version; + let hash_after_approve = approve_response.policy_hash.clone(); + + let chunk_before = state + .store + .get_draft_chunk(&chunk_id) + .await + .unwrap() + .unwrap(); + let decided_at_before = chunk_before.decided_at_ms; + assert!(decided_at_before.is_some()); + + // Inject a failure into the next sandbox_policy revision insert. + // The approve above already wrote version 1; the undo's + // `remove_chunk_from_policy` will try to write version 2 and trip + // the trigger. The trigger raises a non-unique-violation error, so + // `apply_merge_operations_with_retry` returns Status::internal on + // its first attempt without entering the retry loop. + let sqlite_pool = match &*state.store { + Store::Sqlite(s) => s.pool().clone(), + Store::Postgres(_) => panic!("test requires sqlite-backed store"), + }; + sqlx::query( + r#" +CREATE TRIGGER fail_next_policy_revision +BEFORE INSERT ON "objects" +WHEN NEW.object_type = 'sandbox_policy' +BEGIN + SELECT RAISE(ABORT, 'simulated DB failure for orphan-rule test'); +END +"#, + ) + .execute(&sqlite_pool) + .await + .unwrap(); + + let undo_err = handle_undo_draft_chunk( + &state, + Request::new(UndoDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: chunk_id.clone(), + }), + ) + .await + .unwrap_err(); + // The error surface isn't the focus — any non-OK is acceptable. + // The important assertion is the post-failure state below. + let _ = undo_err; + + // The chunk must NOT be left in `pending`. The rollback should + // restore `approved` with the original decided_at_ms preserved, so + // that subsequent operator actions (retry undo, reject, approve) + // behave the same as if the undo had never been attempted. + let chunk_after = state + .store + .get_draft_chunk(&chunk_id) + .await + .unwrap() + .unwrap(); + assert_eq!( + chunk_after.status, "approved", + "chunk must be rolled back to approved when remove_chunk_from_policy fails — \ + otherwise the rule is orphaned (live in policy, no approved chunk record)" + ); + assert_eq!( + chunk_after.decided_at_ms, decided_at_before, + "rollback must preserve the original decided_at_ms", + ); + + // The live policy must also be unchanged — no version 2 row written, + // and the approved revision's hash must still match what approve + // produced. Otherwise the rule body could drift even when the chunk + // status rolls back correctly. + let revisions_after = handle_list_sandbox_policies( + &state, + Request::new(ListSandboxPoliciesRequest { + name: sandbox_name.clone(), + limit: 10, + offset: 0, + global: false, + }), + ) + .await + .unwrap() + .into_inner(); + assert_eq!( + revisions_after.revisions.len(), + 1, + "no new policy revision should have been created when remove_chunk_from_policy failed" + ); + assert_eq!(revisions_after.revisions[0].version, version_after_approve); + assert_eq!(revisions_after.revisions[0].policy_hash, hash_after_approve); + + // Tear down the trigger so cleanup paths (if any) work. + sqlx::query("DROP TRIGGER fail_next_policy_revision") + .execute(&sqlite_pool) + .await + .unwrap(); + } + + /// Repeat denials with the same `(host, port, binary)` while a pending + /// chunk already exists must dedup — the existing pending row's `hit_count` + /// should accumulate rather than producing a fresh row each flush. + #[tokio::test] + async fn repeat_pending_denials_still_dedup() { + use openshell_core::proto::{ + GetDraftPolicyRequest, NetworkBinary, NetworkEndpoint, SandboxPhase, SandboxSpec, + }; + + let state = test_server_state().await; + let sandbox = Sandbox { + metadata: Some(openshell_core::proto::datamodel::v1::ObjectMeta { + id: "sb-dedup-pending".to_string(), + name: "dedup-pending".to_string(), + created_at_ms: 1_000_000, + labels: std::collections::HashMap::new(), + }), + spec: Some(SandboxSpec { + policy: None, + ..Default::default() + }), + phase: SandboxPhase::Ready as i32, + ..Default::default() + }; + state.store.put_message(&sandbox).await.unwrap(); + let sandbox_name = sandbox.object_name().to_string(); + + let proposed_rule = NetworkPolicyRule { + name: "allow_api".to_string(), + endpoints: vec![NetworkEndpoint { + host: "api.example.com".to_string(), + port: 443, + ..Default::default() + }], + binaries: vec![NetworkBinary { + path: "/usr/bin/curl".to_string(), + ..Default::default() + }], + }; + + for hit in 1..=3 { + handle_submit_policy_analysis( + &state, + Request::new(SubmitPolicyAnalysisRequest { + name: sandbox_name.clone(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_api".to_string(), + proposed_rule: Some(proposed_rule.clone()), + rationale: format!("denial #{hit}"), + confidence: 0.9, + hit_count: 1, + first_seen_ms: 100, + last_seen_ms: 100 + i64::from(hit), + binary: "/usr/bin/curl".to_string(), + ..Default::default() + }], + ..Default::default() + }), + ) + .await + .unwrap(); + } + + let pending = handle_get_draft_policy( + &state, + Request::new(GetDraftPolicyRequest { + name: sandbox_name, + status_filter: "pending".to_string(), + }), + ) + .await + .unwrap() + .into_inner(); + assert_eq!( + pending.chunks.len(), + 1, + "repeat pending denials should merge into a single pending chunk", + ); + assert_eq!(pending.chunks[0].hit_count, 3); + } + + /// Rejected chunks must release their dedup slot so a future denial for + /// the same `(host, port, binary)` can surface as a new pending chunk. + #[tokio::test] + async fn rejected_chunk_does_not_block_new_pending() { + use openshell_core::proto::{ + GetDraftPolicyRequest, NetworkBinary, NetworkEndpoint, SandboxPhase, SandboxSpec, + }; + + let state = test_server_state().await; + let sandbox = Sandbox { + metadata: Some(openshell_core::proto::datamodel::v1::ObjectMeta { + id: "sb-rejected-release".to_string(), + name: "rejected-release".to_string(), + created_at_ms: 1_000_000, + labels: std::collections::HashMap::new(), + }), + spec: Some(SandboxSpec { + policy: None, + ..Default::default() + }), + phase: SandboxPhase::Ready as i32, + ..Default::default() + }; + state.store.put_message(&sandbox).await.unwrap(); + let sandbox_name = sandbox.object_name().to_string(); + + let proposed_rule = NetworkPolicyRule { + name: "allow_api".to_string(), + endpoints: vec![NetworkEndpoint { + host: "api.example.com".to_string(), + port: 443, + ..Default::default() + }], + binaries: vec![NetworkBinary { + path: "/usr/bin/curl".to_string(), + ..Default::default() + }], + }; + + handle_submit_policy_analysis( + &state, + Request::new(SubmitPolicyAnalysisRequest { + name: sandbox_name.clone(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_api".to_string(), + proposed_rule: Some(proposed_rule.clone()), + rationale: "first denial".to_string(), + confidence: 0.9, + hit_count: 1, + first_seen_ms: 100, + last_seen_ms: 100, + binary: "/usr/bin/curl".to_string(), + ..Default::default() + }], + ..Default::default() + }), + ) + .await + .unwrap(); + + let pending = handle_get_draft_policy( + &state, + Request::new(GetDraftPolicyRequest { + name: sandbox_name.clone(), + status_filter: "pending".to_string(), + }), + ) + .await + .unwrap() + .into_inner(); + let chunk_id = pending.chunks[0].id.clone(); + + handle_reject_draft_chunk( + &state, + Request::new(RejectDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id, + reason: "operator rejected".to_string(), + }), + ) + .await + .unwrap(); + + // A new denial for the same key after rejection should surface a + // fresh pending chunk — the rejected row no longer holds the slot. + handle_submit_policy_analysis( + &state, + Request::new(SubmitPolicyAnalysisRequest { + name: sandbox_name.clone(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_api".to_string(), + proposed_rule: Some(proposed_rule), + rationale: "denial after rejection".to_string(), + confidence: 0.9, + hit_count: 1, + first_seen_ms: 200, + last_seen_ms: 200, + binary: "/usr/bin/curl".to_string(), + ..Default::default() + }], + ..Default::default() + }), + ) + .await + .unwrap(); + + let pending_after = handle_get_draft_policy( + &state, + Request::new(GetDraftPolicyRequest { + name: sandbox_name, + status_filter: "pending".to_string(), + }), + ) + .await + .unwrap() + .into_inner(); + assert_eq!( + pending_after.chunks.len(), + 1, + "rejected chunk should release its dedup slot for new pending chunks", + ); + } + + /// Walk through the issue #1245 scenario end-to-end through a real tonic + /// server bound to a local TCP port. Manual demo; not part of the default + /// test run because it binds a real socket and emits println! narrative. + /// Run with: + /// + /// `cargo test -p openshell-server --lib demo_stale_allowed_ips_via_grpc -- --ignored --nocapture` + /// + /// To watch the bug live, `git stash push -- crates/openshell-server/src/persistence/*.rs + /// crates/openshell-server/migrations` and rerun: the final assertion + /// fails and the narrative shows pending count = 0 after the stale-IP + /// denial. `git stash pop` restores the fix. + #[tokio::test] + #[ignore = "manual demo for issue #1245; run with --ignored --nocapture"] + async fn demo_stale_allowed_ips_via_grpc() { + use crate::OpenShellService; + use openshell_core::proto::open_shell_client::OpenShellClient; + use openshell_core::proto::open_shell_server::OpenShellServer; + use openshell_core::proto::{ + ApproveDraftChunkRequest, GetDraftPolicyRequest, NetworkBinary, NetworkEndpoint, + SandboxPhase, SandboxSpec, SubmitPolicyAnalysisRequest, + }; + use tokio::net::TcpListener; + use tokio_stream::wrappers::TcpListenerStream; + + let state = test_server_state().await; + let sandbox = Sandbox { + metadata: Some(openshell_core::proto::datamodel::v1::ObjectMeta { + id: "sb-demo-1245".to_string(), + name: "demo-1245".to_string(), + created_at_ms: 1_000_000, + labels: std::collections::HashMap::new(), + }), + spec: Some(SandboxSpec { + policy: None, + ..Default::default() + }), + phase: SandboxPhase::Ready as i32, + ..Default::default() + }; + state.store.put_message(&sandbox).await.unwrap(); + let sandbox_name = sandbox.object_name().to_string(); + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let server_handle = tokio::spawn(async move { + tonic::transport::Server::builder() + .add_service(OpenShellServer::new(OpenShellService::new(state))) + .serve_with_incoming(TcpListenerStream::new(listener)) + .await + .unwrap(); + }); + + let endpoint = format!("http://{addr}"); + let mut client = OpenShellClient::connect(endpoint.clone()).await.unwrap(); + + println!(); + println!("=== Demo: stale allowed_ips fix (issue #1245) ==="); + println!("gateway listening on {addr}"); + println!("sandbox: {sandbox_name}"); + println!(); + + let make_rule = |ip: &str| NetworkPolicyRule { + name: "allow_internal_api".to_string(), + endpoints: vec![NetworkEndpoint { + host: "internal-api.example.com".to_string(), + port: 443, + allowed_ips: vec![ip.to_string()], + ..Default::default() + }], + binaries: vec![NetworkBinary { + path: "/usr/bin/curl".to_string(), + ..Default::default() + }], + }; + + let pending_filter = || GetDraftPolicyRequest { + name: sandbox_name.clone(), + status_filter: "pending".to_string(), + }; + + println!("[1] supervisor reports first denial (resolved IP 10.0.5.10)"); + let resp = client + .submit_policy_analysis(SubmitPolicyAnalysisRequest { + name: sandbox_name.clone(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_internal_api".to_string(), + proposed_rule: Some(make_rule("10.0.5.10")), + rationale: "first denial".to_string(), + confidence: 0.9, + hit_count: 1, + first_seen_ms: 100, + last_seen_ms: 100, + binary: "/usr/bin/curl".to_string(), + ..Default::default() + }], + ..Default::default() + }) + .await + .unwrap() + .into_inner(); + println!( + " accepted={} rejected={}", + resp.accepted_chunks, resp.rejected_chunks + ); + + let pending = client + .get_draft_policy(pending_filter()) + .await + .unwrap() + .into_inner(); + println!(" pending chunks: {}", pending.chunks.len()); + let chunk_id = pending.chunks[0].id.clone(); + println!(); + + println!("[2] operator approves chunk {chunk_id}"); + client + .approve_draft_chunk(ApproveDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: chunk_id.clone(), + }) + .await + .unwrap(); + let pending = client + .get_draft_policy(pending_filter()) + .await + .unwrap() + .into_inner(); + println!( + " pending chunks: {} (rule is now live)", + pending.chunks.len() + ); + println!(); + + println!("[3] backend flips to 10.0.5.99 — supervisor reports fresh denial"); + println!(" same (host, port, binary) but allowed_ips=[10.0.5.99]"); + client + .submit_policy_analysis(SubmitPolicyAnalysisRequest { + name: sandbox_name.clone(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_internal_api".to_string(), + proposed_rule: Some(make_rule("10.0.5.99")), + rationale: "denial after backend flip".to_string(), + confidence: 0.9, + hit_count: 1, + first_seen_ms: 200, + last_seen_ms: 200, + binary: "/usr/bin/curl".to_string(), + ..Default::default() + }], + ..Default::default() + }) + .await + .unwrap(); + + let pending = client + .get_draft_policy(pending_filter()) + .await + .unwrap() + .into_inner(); + println!(" pending chunks: {}", pending.chunks.len()); + println!(); + if pending.chunks.is_empty() { + println!( + "BUG REPRODUCED: stale-IP denial was absorbed by the approved chunk's \ + dedup slot. Operator has no in-product signal." + ); + } else { + println!( + "FIX VERIFIED: stale-IP denial surfaced as a fresh pending chunk \ + (rationale: {:?})", + pending.chunks[0].rationale, + ); + } + println!(); + + server_handle.abort(); + + assert!( + !pending.chunks.is_empty(), + "expected the stale-IP denial to surface a pending chunk after the fix", + ); + } } diff --git a/crates/openshell-server/src/persistence/postgres.rs b/crates/openshell-server/src/persistence/postgres.rs index 2cd6a046f..a77768524 100644 --- a/crates/openshell-server/src/persistence/postgres.rs +++ b/crates/openshell-server/src/persistence/postgres.rs @@ -440,6 +440,68 @@ WHERE object_type = $1 AND id = $2 row.map(row_to_draft_chunk_record).transpose() } + pub async fn find_pending_draft_chunk_for_key( + &self, + sandbox_id: &str, + host: &str, + port: i32, + binary: &str, + ) -> PersistenceResult> { + let dedup_key = draft_chunk_dedup_key_for_status("pending", host, port, binary); + let row = sqlx::query( + r" +SELECT id, scope, status, hit_count, payload, created_at_ms, updated_at_ms +FROM objects +WHERE object_type = $1 AND scope = $2 AND status = 'pending' AND dedup_key = $3 +", + ) + .bind(DRAFT_CHUNK_OBJECT_TYPE) + .bind(sandbox_id) + .bind(dedup_key) + .fetch_optional(&self.pool) + .await + .map_err(|e| map_db_error(&e))?; + + row.map(row_to_draft_chunk_record).transpose() + } + + /// Find any other approved draft chunk for `(sandbox_id, host, port, binary)` + /// excluding the chunk identified by `exclude_chunk_id`. Approved chunks + /// have `dedup_key=NULL` (issue #1245), so the partial unique index does + /// not constrain them — multiple approved chunks can coexist for the same + /// key. Callers that intend to mutate a rule contributed to by one chunk + /// use this to detect when another decided chunk is also contributing. + pub async fn find_other_approved_chunk_for_key( + &self, + sandbox_id: &str, + host: &str, + port: i32, + binary: &str, + exclude_chunk_id: &str, + ) -> PersistenceResult> { + let rows = sqlx::query( + r" +SELECT id, scope, status, hit_count, payload, created_at_ms, updated_at_ms +FROM objects +WHERE object_type = $1 AND scope = $2 AND status = 'approved' AND id != $3 +", + ) + .bind(DRAFT_CHUNK_OBJECT_TYPE) + .bind(sandbox_id) + .bind(exclude_chunk_id) + .fetch_all(&self.pool) + .await + .map_err(|e| map_db_error(&e))?; + + for row in rows { + let record = row_to_draft_chunk_record(row)?; + if record.host == host && record.port == port && record.binary == binary { + return Ok(Some(record)); + } + } + Ok(None) + } + pub async fn list_draft_chunks( &self, sandbox_id: &str, @@ -492,11 +554,12 @@ ORDER BY created_at_ms DESC record.decided_at_ms = decided_at_ms; record.last_seen_ms = current_time_ms()?; let payload = draft_chunk_payload_from_record(&record)?; + let dedup_key = draft_chunk_dedup_key(&record); let result = sqlx::query( r" UPDATE objects -SET status = $3, payload = $4, updated_at_ms = $5 +SET status = $3, payload = $4, updated_at_ms = $5, dedup_key = $6 WHERE object_type = $1 AND id = $2 ", ) @@ -505,6 +568,7 @@ WHERE object_type = $1 AND id = $2 .bind(status) .bind(payload) .bind(record.last_seen_ms) + .bind(dedup_key) .execute(&self.pool) .await .map_err(|e| map_db_error(&e))?; @@ -597,8 +661,20 @@ WHERE object_type = $1 AND scope = $2 } } -fn draft_chunk_dedup_key(chunk: &DraftChunkRecord) -> String { - format!("{}|{}|{}", chunk.host, chunk.port, chunk.binary) +fn draft_chunk_dedup_key(chunk: &DraftChunkRecord) -> Option { + draft_chunk_dedup_key_for_status(&chunk.status, &chunk.host, chunk.port, &chunk.binary) +} + +fn draft_chunk_dedup_key_for_status( + status: &str, + host: &str, + port: i32, + binary: &str, +) -> Option { + // Only pending chunks participate in dedup. Approved and rejected chunks + // get NULL so they don't absorb future submissions for the same + // (host, port, binary) — see issue #1245. + (status == "pending").then(|| format!("{host}|{port}|{binary}")) } fn row_to_object_record(row: sqlx::postgres::PgRow) -> ObjectRecord { diff --git a/crates/openshell-server/src/persistence/sqlite.rs b/crates/openshell-server/src/persistence/sqlite.rs index fafb07597..f6d9f2aba 100644 --- a/crates/openshell-server/src/persistence/sqlite.rs +++ b/crates/openshell-server/src/persistence/sqlite.rs @@ -24,6 +24,13 @@ pub struct SqliteStore { } impl SqliteStore { + /// Test-only accessor for raw pool access (e.g., installing failure + /// triggers to drive fault-injection tests in sibling modules). + #[cfg(test)] + pub(crate) fn pool(&self) -> &SqlitePool { + &self.pool + } + pub async fn connect(url: &str) -> PersistenceResult { let is_in_memory = url.contains(":memory:") || url.contains("mode=memory"); let max_connections = if is_in_memory { 1 } else { 5 }; @@ -455,6 +462,68 @@ WHERE "object_type" = ?1 AND "id" = ?2 row.map(row_to_draft_chunk_record).transpose() } + pub async fn find_pending_draft_chunk_for_key( + &self, + sandbox_id: &str, + host: &str, + port: i32, + binary: &str, + ) -> PersistenceResult> { + let dedup_key = draft_chunk_dedup_key_for_status("pending", host, port, binary); + let row = sqlx::query( + r#" +SELECT "id", "scope", "status", "hit_count", "payload", "created_at_ms", "updated_at_ms" +FROM "objects" +WHERE "object_type" = ?1 AND "scope" = ?2 AND "status" = 'pending' AND "dedup_key" = ?3 +"#, + ) + .bind(DRAFT_CHUNK_OBJECT_TYPE) + .bind(sandbox_id) + .bind(dedup_key) + .fetch_optional(&self.pool) + .await + .map_err(|e| map_db_error(&e))?; + + row.map(row_to_draft_chunk_record).transpose() + } + + /// Find any other approved draft chunk for `(sandbox_id, host, port, binary)` + /// excluding the chunk identified by `exclude_chunk_id`. Approved chunks + /// have `dedup_key=NULL` (issue #1245), so the partial unique index does + /// not constrain them — multiple approved chunks can coexist for the same + /// key. Callers that intend to mutate a rule contributed to by one chunk + /// use this to detect when another decided chunk is also contributing. + pub async fn find_other_approved_chunk_for_key( + &self, + sandbox_id: &str, + host: &str, + port: i32, + binary: &str, + exclude_chunk_id: &str, + ) -> PersistenceResult> { + let rows = sqlx::query( + r#" +SELECT "id", "scope", "status", "hit_count", "payload", "created_at_ms", "updated_at_ms" +FROM "objects" +WHERE "object_type" = ?1 AND "scope" = ?2 AND "status" = 'approved' AND "id" != ?3 +"#, + ) + .bind(DRAFT_CHUNK_OBJECT_TYPE) + .bind(sandbox_id) + .bind(exclude_chunk_id) + .fetch_all(&self.pool) + .await + .map_err(|e| map_db_error(&e))?; + + for row in rows { + let record = row_to_draft_chunk_record(row)?; + if record.host == host && record.port == port && record.binary == binary { + return Ok(Some(record)); + } + } + Ok(None) + } + pub async fn list_draft_chunks( &self, sandbox_id: &str, @@ -507,11 +576,12 @@ ORDER BY "created_at_ms" DESC record.decided_at_ms = decided_at_ms; record.last_seen_ms = current_time_ms()?; let payload = draft_chunk_payload_from_record(&record)?; + let dedup_key = draft_chunk_dedup_key(&record); let result = sqlx::query( r#" UPDATE "objects" -SET "status" = ?3, "payload" = ?4, "updated_at_ms" = ?5 +SET "status" = ?3, "payload" = ?4, "updated_at_ms" = ?5, "dedup_key" = ?6 WHERE "object_type" = ?1 AND "id" = ?2 "#, ) @@ -520,6 +590,7 @@ WHERE "object_type" = ?1 AND "id" = ?2 .bind(status) .bind(payload) .bind(record.last_seen_ms) + .bind(dedup_key) .execute(&self.pool) .await .map_err(|e| map_db_error(&e))?; @@ -612,8 +683,20 @@ WHERE "object_type" = ?1 AND "scope" = ?2 } } -fn draft_chunk_dedup_key(chunk: &DraftChunkRecord) -> String { - format!("{}|{}|{}", chunk.host, chunk.port, chunk.binary) +fn draft_chunk_dedup_key(chunk: &DraftChunkRecord) -> Option { + draft_chunk_dedup_key_for_status(&chunk.status, &chunk.host, chunk.port, &chunk.binary) +} + +fn draft_chunk_dedup_key_for_status( + status: &str, + host: &str, + port: i32, + binary: &str, +) -> Option { + // Only pending chunks participate in dedup. Approved and rejected chunks + // get NULL so they don't absorb future submissions for the same + // (host, port, binary) — see issue #1245. + (status == "pending").then(|| format!("{host}|{port}|{binary}")) } fn row_to_object_record(row: sqlx::sqlite::SqliteRow) -> ObjectRecord { @@ -656,3 +739,280 @@ fn row_to_draft_chunk_record(row: sqlx::sqlite::SqliteRow) -> PersistenceResult< updated_at_ms, ) } + +#[cfg(test)] +mod tests { + use super::*; + + async fn fetch_dedup_key(store: &SqliteStore, id: &str) -> Option { + sqlx::query_scalar(r#"SELECT "dedup_key" FROM "objects" WHERE "id" = ?1"#) + .bind(id) + .fetch_one(&store.pool) + .await + .unwrap() + } + + fn make_chunk(id: &str, sandbox_id: &str, status: &str) -> DraftChunkRecord { + DraftChunkRecord { + id: id.to_string(), + sandbox_id: sandbox_id.to_string(), + draft_version: 1, + status: status.to_string(), + rule_name: "allow_internal_api".to_string(), + proposed_rule: Vec::new(), + rationale: "test".to_string(), + security_notes: String::new(), + confidence: 0.9, + created_at_ms: 100, + decided_at_ms: None, + host: "internal-api.example.com".to_string(), + port: 443, + binary: "/usr/bin/curl".to_string(), + hit_count: 1, + first_seen_ms: 100, + last_seen_ms: 100, + } + } + + /// Pending chunks carry a `dedup_key`; approving must clear it so a fresh + /// pending submission for the same key can land as a new row. Issue #1245. + #[tokio::test] + async fn approving_pending_chunk_clears_dedup_key() { + let store = SqliteStore::connect("sqlite::memory:").await.unwrap(); + store.migrate().await.unwrap(); + + let chunk = make_chunk("chunk-1", "sb-1", "pending"); + store.put_draft_chunk(&chunk).await.unwrap(); + assert_eq!( + fetch_dedup_key(&store, "chunk-1").await, + Some("internal-api.example.com|443|/usr/bin/curl".to_string()), + "pending chunk must hold its dedup slot" + ); + + store + .update_draft_chunk_status("chunk-1", "approved", Some(200)) + .await + .unwrap(); + assert_eq!( + fetch_dedup_key(&store, "chunk-1").await, + None, + "approved chunk must release its dedup slot" + ); + } + + /// Rejected chunks must also release their dedup slot. Otherwise a future + /// denial for the same (host, port, binary) is silently absorbed. + #[tokio::test] + async fn rejecting_pending_chunk_clears_dedup_key() { + let store = SqliteStore::connect("sqlite::memory:").await.unwrap(); + store.migrate().await.unwrap(); + + let chunk = make_chunk("chunk-1", "sb-1", "pending"); + store.put_draft_chunk(&chunk).await.unwrap(); + + store + .update_draft_chunk_status("chunk-1", "rejected", Some(200)) + .await + .unwrap(); + assert_eq!( + fetch_dedup_key(&store, "chunk-1").await, + None, + "rejected chunk must release its dedup slot" + ); + } + + /// `find_pending_draft_chunk_for_key` returns the pending peer for a + /// (sandbox, host, port, binary) tuple and ignores decided chunks. + #[tokio::test] + async fn find_pending_draft_chunk_for_key_returns_pending_peer() { + let store = SqliteStore::connect("sqlite::memory:").await.unwrap(); + store.migrate().await.unwrap(); + + let approved = make_chunk("chunk-approved", "sb-1", "pending"); + store.put_draft_chunk(&approved).await.unwrap(); + store + .update_draft_chunk_status("chunk-approved", "approved", Some(150)) + .await + .unwrap(); + + // No peer yet — only the now-approved chunk exists. + assert!( + store + .find_pending_draft_chunk_for_key( + "sb-1", + "internal-api.example.com", + 443, + "/usr/bin/curl" + ) + .await + .unwrap() + .is_none() + ); + + let pending = make_chunk("chunk-pending", "sb-1", "pending"); + store.put_draft_chunk(&pending).await.unwrap(); + let peer = store + .find_pending_draft_chunk_for_key( + "sb-1", + "internal-api.example.com", + 443, + "/usr/bin/curl", + ) + .await + .unwrap() + .expect("pending peer must be found"); + assert_eq!(peer.id, "chunk-pending"); + } + + /// `find_other_approved_chunk_for_key` filters approved rows in Rust by + /// deserializing each payload, so a wrong field comparison would silently + /// match unrelated rows and block legitimate operator actions. Cover the + /// four near-miss axes (sandbox, host, port, binary) plus the + /// exclude-self guard and a positive match. + #[tokio::test] + async fn find_other_approved_chunk_for_key_ignores_unrelated_chunks() { + async fn put_approved(store: &SqliteStore, chunk: DraftChunkRecord) { + let id = chunk.id.clone(); + store.put_draft_chunk(&chunk).await.unwrap(); + store + .update_draft_chunk_status(&id, "approved", Some(200)) + .await + .unwrap(); + } + + let store = SqliteStore::connect("sqlite::memory:").await.unwrap(); + store.migrate().await.unwrap(); + + // Self — must be excluded by id. + put_approved(&store, make_chunk("chunk-self", "sb-1", "pending")).await; + // Different sandbox. + put_approved(&store, make_chunk("chunk-other-sandbox", "sb-2", "pending")).await; + // Different host. + let mut other_host = make_chunk("chunk-other-host", "sb-1", "pending"); + other_host.host = "different-host.example.com".to_string(); + put_approved(&store, other_host).await; + // Different port. + let mut other_port = make_chunk("chunk-other-port", "sb-1", "pending"); + other_port.port = 8443; + put_approved(&store, other_port).await; + // Different binary. + let mut other_binary = make_chunk("chunk-other-binary", "sb-1", "pending"); + other_binary.binary = "/usr/bin/wget".to_string(); + put_approved(&store, other_binary).await; + + // Nothing else approved matches → None. + assert!( + store + .find_other_approved_chunk_for_key( + "sb-1", + "internal-api.example.com", + 443, + "/usr/bin/curl", + "chunk-self", + ) + .await + .unwrap() + .is_none(), + "must not match different sandbox/host/port/binary or self" + ); + + // Add a true peer and confirm it is returned. + put_approved(&store, make_chunk("chunk-peer", "sb-1", "pending")).await; + let peer = store + .find_other_approved_chunk_for_key( + "sb-1", + "internal-api.example.com", + 443, + "/usr/bin/curl", + "chunk-self", + ) + .await + .unwrap() + .expect("matching approved peer must be returned"); + assert_eq!(peer.id, "chunk-peer"); + + // And excluding the peer instead returns self (the only remaining match). + let self_match = store + .find_other_approved_chunk_for_key( + "sb-1", + "internal-api.example.com", + 443, + "/usr/bin/curl", + "chunk-peer", + ) + .await + .unwrap() + .expect("with the peer excluded, self must match"); + assert_eq!(self_match.id, "chunk-self"); + } + + /// Migration 005 clears `dedup_key` from any decided rows seeded before + /// the runtime fix landed, but must leave pending rows alone. The test + /// loads the migration SQL directly from the file (via `include_str!`) + /// so a drift between the file and the test's expectations would be + /// caught — and seeds one row of each status, including a pending row + /// that the migration must NOT touch. + #[tokio::test] + async fn migration_clears_dedup_key_on_legacy_decided_rows() { + const MIGRATION_005: &str = + include_str!("../../migrations/sqlite/005_clear_dedup_for_decided_chunks.sql"); + + let store = SqliteStore::connect("sqlite::memory:").await.unwrap(); + store.migrate().await.unwrap(); + + let seed = |id: &'static str, status: &'static str, dedup: &'static str| { + let pool = store.pool.clone(); + async move { + sqlx::query( + r#" +INSERT INTO "objects" ( + "id", "object_type", "scope", "status", "dedup_key", + "hit_count", "payload", "created_at_ms", "updated_at_ms" +) +VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9) +"#, + ) + .bind(id) + .bind(DRAFT_CHUNK_OBJECT_TYPE) + .bind("sb-legacy") + .bind(status) + .bind(dedup) + .bind(1_i64) + .bind(Vec::::new()) + .bind(100_i64) + .bind(100_i64) + .execute(&pool) + .await + .unwrap(); + } + }; + + // Pre-fix gateway: every chunk carries a dedup_key regardless of + // status. Use distinct keys so all three rows can coexist under the + // partial unique index. + seed("legacy-approved", "approved", "host-a|443|/usr/bin/curl").await; + seed("legacy-rejected", "rejected", "host-r|443|/usr/bin/curl").await; + seed("inflight-pending", "pending", "host-p|443|/usr/bin/curl").await; + + sqlx::raw_sql(MIGRATION_005) + .execute(&store.pool) + .await + .unwrap(); + + assert_eq!( + fetch_dedup_key(&store, "legacy-approved").await, + None, + "migration must clear dedup_key on approved legacy rows" + ); + assert_eq!( + fetch_dedup_key(&store, "legacy-rejected").await, + None, + "migration must clear dedup_key on rejected legacy rows" + ); + assert_eq!( + fetch_dedup_key(&store, "inflight-pending").await, + Some("host-p|443|/usr/bin/curl".to_string()), + "migration must NOT clear dedup_key on in-flight pending rows" + ); + } +} diff --git a/crates/openshell-server/src/policy_store.rs b/crates/openshell-server/src/policy_store.rs index f0a43698e..3c3c4bd3d 100644 --- a/crates/openshell-server/src/policy_store.rs +++ b/crates/openshell-server/src/policy_store.rs @@ -58,6 +58,23 @@ pub trait PolicyStoreExt { async fn get_draft_chunk(&self, id: &str) -> PersistenceResult>; + async fn find_pending_draft_chunk_for_key( + &self, + sandbox_id: &str, + host: &str, + port: i32, + binary: &str, + ) -> PersistenceResult>; + + async fn find_other_approved_chunk_for_key( + &self, + sandbox_id: &str, + host: &str, + port: i32, + binary: &str, + exclude_chunk_id: &str, + ) -> PersistenceResult>; + async fn list_draft_chunks( &self, sandbox_id: &str, @@ -200,6 +217,61 @@ impl PolicyStoreExt for Store { } } + async fn find_pending_draft_chunk_for_key( + &self, + sandbox_id: &str, + host: &str, + port: i32, + binary: &str, + ) -> PersistenceResult> { + match self { + Self::Postgres(store) => { + store + .find_pending_draft_chunk_for_key(sandbox_id, host, port, binary) + .await + } + Self::Sqlite(store) => { + store + .find_pending_draft_chunk_for_key(sandbox_id, host, port, binary) + .await + } + } + } + + async fn find_other_approved_chunk_for_key( + &self, + sandbox_id: &str, + host: &str, + port: i32, + binary: &str, + exclude_chunk_id: &str, + ) -> PersistenceResult> { + match self { + Self::Postgres(store) => { + store + .find_other_approved_chunk_for_key( + sandbox_id, + host, + port, + binary, + exclude_chunk_id, + ) + .await + } + Self::Sqlite(store) => { + store + .find_other_approved_chunk_for_key( + sandbox_id, + host, + port, + binary, + exclude_chunk_id, + ) + .await + } + } + } + async fn list_draft_chunks( &self, sandbox_id: &str,