From e59ed96f3e976306f7f67bd93c13688dd58c4c76 Mon Sep 17 00:00:00 2001 From: Max Dubrinsky Date: Mon, 11 May 2026 18:58:02 -0400 Subject: [PATCH] fix(server): release dedup slot on decided draft chunks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A hostname rule's `allowed_ips` allowlist can go stale when DNS resolves to a different backend. Today the proxy denies the connection with `engine:ssrf`, but the mechanistic mapper's draft chunk for the same `(host, port, binary)` is silently absorbed by the already-approved row's dedup `ON CONFLICT` — so the TUI shows nothing pending and the operator has no in-product signal. Only `pending` draft chunks now hold a `dedup_key`. Approving or rejecting a chunk releases the slot, so a fresh denial for the same destination surfaces as a new pending chunk carrying the newly observed details. Migration 005 clears the key on any pre-existing decided rows so the fix takes effect on existing gateways. Releasing the slot lets decided peers coexist with new pending rows for the same `(host, port, binary)`. To prevent the resulting last-writer- wins and orphan-rule hazards, approve/reject/undo now refuse when another chunk contributes to the same rule and name the conflicting peer in the error. Undo flips status first so the partial unique index detects the collision, and rolls back to `approved` if the policy mutation itself fails (preventing a pending chunk with a still-live rule). Closes #1245 Signed-off-by: Max Dubrinsky --- architecture/gateway.md | 2 +- architecture/security-policy.md | 13 + .../005_clear_dedup_for_decided_chunks.sql | 10 + .../005_clear_dedup_for_decided_chunks.sql | 10 + crates/openshell-server/src/grpc/policy.rs | 2213 ++++++++++++++++- .../src/persistence/postgres.rs | 82 +- .../src/persistence/sqlite.rs | 366 ++- crates/openshell-server/src/policy_store.rs | 72 + 8 files changed, 2753 insertions(+), 15 deletions(-) create mode 100644 crates/openshell-server/migrations/postgres/005_clear_dedup_for_decided_chunks.sql create mode 100644 crates/openshell-server/migrations/sqlite/005_clear_dedup_for_decided_chunks.sql diff --git a/architecture/gateway.md b/architecture/gateway.md index d89706e64..c5fd349c7 100644 --- a/architecture/gateway.md +++ b/architecture/gateway.md @@ -74,7 +74,7 @@ The storage schema is intentionally narrow: | `scope` | Optional owner or namespace for scoped/versioned records, such as a sandbox ID for policy revisions. | | `version` | Optional monotonically increasing version for scoped records. | | `status` | Optional workflow state for records such as policy revisions or draft policy chunks. | -| `dedup_key` and `hit_count` | Optional policy-advisor fields for coalescing repeated observations. | +| `dedup_key` and `hit_count` | Optional policy-advisor fields for coalescing repeated observations. Draft policy chunks only set `dedup_key` while pending; the slot is released when the draft is approved or rejected so a later denial for the same destination can surface as a new pending draft. | | `payload` | Prost-encoded protobuf payload for the full domain object. | | `created_at_ms` and `updated_at_ms` | Gateway timestamps used for ordering and list output. | | `labels` | JSON object carrying Kubernetes-style object labels for filtering and organization. | diff --git a/architecture/security-policy.md b/architecture/security-policy.md index e5f179dc1..933f5ecf1 100644 --- a/architecture/security-policy.md +++ b/architecture/security-policy.md @@ -72,6 +72,19 @@ recommendations: 4. A human or admin workflow approves or rejects drafts. 5. Approved drafts merge into the target sandbox policy. +Drafts dedup on `(sandbox, host, port, binary)` while they are pending so repeat +denials accumulate hits on a single recommendation. Once a draft is decided +(approved or rejected) it releases its dedup slot. A fresh denial against the +same destination — for example, a hostname rule with stale `allowed_ips` after +DNS resolves to a new backend — therefore surfaces as a new pending draft +carrying the newly observed details, rather than being silently absorbed by the +existing decision. + +Because decided drafts coexist with pending peers for the same destination, the +gateway refuses approve, reject, and undo operations that would otherwise +overwrite or strip a rule another draft contributes to. The error names the +conflicting peer so the operator can decide it first. + The advisor should propose narrow additions and preserve explicit-deny behavior. It is a workflow aid, not an automatic permission grant. diff --git a/crates/openshell-server/migrations/postgres/005_clear_dedup_for_decided_chunks.sql b/crates/openshell-server/migrations/postgres/005_clear_dedup_for_decided_chunks.sql new file mode 100644 index 000000000..f4fed4d2e --- /dev/null +++ b/crates/openshell-server/migrations/postgres/005_clear_dedup_for_decided_chunks.sql @@ -0,0 +1,10 @@ +-- Issue #1245: only pending draft chunks should hold a dedup_key. Approved +-- and rejected chunks held the same (host, port, binary) key as pending ones, +-- so a fresh denial submission for the same key was silently absorbed by the +-- decided row's ON CONFLICT update. Clear the key on existing decided rows +-- so future submissions can insert as new pending chunks. +UPDATE objects + SET dedup_key = NULL + WHERE object_type = 'draft_policy_chunk' + AND status IN ('approved', 'rejected') + AND dedup_key IS NOT NULL; diff --git a/crates/openshell-server/migrations/sqlite/005_clear_dedup_for_decided_chunks.sql b/crates/openshell-server/migrations/sqlite/005_clear_dedup_for_decided_chunks.sql new file mode 100644 index 000000000..f4fed4d2e --- /dev/null +++ b/crates/openshell-server/migrations/sqlite/005_clear_dedup_for_decided_chunks.sql @@ -0,0 +1,10 @@ +-- Issue #1245: only pending draft chunks should hold a dedup_key. Approved +-- and rejected chunks held the same (host, port, binary) key as pending ones, +-- so a fresh denial submission for the same key was silently absorbed by the +-- decided row's ON CONFLICT update. Clear the key on existing decided rows +-- so future submissions can insert as new pending chunks. +UPDATE objects + SET dedup_key = NULL + WHERE object_type = 'draft_policy_chunk' + AND status IN ('approved', 'rejected') + AND dedup_key IS NOT NULL; diff --git a/crates/openshell-server/src/grpc/policy.rs b/crates/openshell-server/src/grpc/policy.rs index 2c62c930a..3f7cbf01a 100644 --- a/crates/openshell-server/src/grpc/policy.rs +++ b/crates/openshell-server/src/grpc/policy.rs @@ -53,7 +53,7 @@ use std::collections::{BTreeMap, HashMap}; use std::net::{IpAddr, Ipv4Addr}; use std::sync::Arc; use tonic::{Request, Response, Status}; -use tracing::{debug, info, warn}; +use tracing::{debug, error, info, warn}; use super::validation::{ level_matches, source_matches, validate_policy_safety, validate_static_fields_unchanged, @@ -1470,6 +1470,50 @@ pub(super) async fn handle_approve_draft_chunk( ))); } + // After issue #1245, a rejected chunk and a pending peer can coexist for + // the same (host, port, binary). Approving the rejected chunk would push + // its (possibly stale) proposed_rule into the live policy while leaving + // the newer pending peer queued — a subsequent approve of that peer then + // overwrites the rule with last-writer-wins semantics. Require the + // operator to decide the peer first so the approval reflects the latest + // observed proposal rather than an order-dependent merge. + if chunk.status == "rejected" { + if let Some(peer) = state + .store + .find_pending_draft_chunk_for_key(&sandbox_id, &chunk.host, chunk.port, &chunk.binary) + .await + .map_err(|e| Status::internal(format!("find pending peer failed: {e}")))? + { + return Err(Status::failed_precondition(format!( + "cannot approve rejected chunk {}: a pending chunk ({}) already exists for \ + the same destination ({}:{} from {}). Decide that pending chunk first.", + req.chunk_id, peer.id, chunk.host, chunk.port, chunk.binary, + ))); + } + // Another approved chunk for the same key can also coexist post-#1245. + // Approving this rejected chunk would push its (possibly stale) rule + // through `merge_chunk_into_policy`, overwriting the peer's contribution + // with last-writer-wins semantics. + if let Some(peer) = state + .store + .find_other_approved_chunk_for_key( + &sandbox_id, + &chunk.host, + chunk.port, + &chunk.binary, + &req.chunk_id, + ) + .await + .map_err(|e| Status::internal(format!("find approved peer failed: {e}")))? + { + return Err(Status::failed_precondition(format!( + "cannot approve rejected chunk {}: another approved chunk ({}) is already \ + active for the rule at {}:{} from {}. Undo or reject that chunk first.", + req.chunk_id, peer.id, chunk.host, chunk.port, chunk.binary, + ))); + } + } + info!( sandbox_id = %sandbox_id, chunk_id = %req.chunk_id, @@ -1571,6 +1615,46 @@ pub(super) async fn handle_reject_draft_chunk( if was_approved { require_no_global_policy(state).await?; + // A pending peer for the same (host, port, binary) is reachable after + // issue #1245: approving a chunk releases its dedup slot, letting a + // fresh denial surface as a new pending row. Rejecting the approved + // chunk while that peer exists would strip the rule from policy but + // leave the peer queued — approving it later silently re-installs the + // rule the operator just rejected. Require the operator to decide + // the peer first so the intent is unambiguous. + if let Some(peer) = state + .store + .find_pending_draft_chunk_for_key(&sandbox_id, &chunk.host, chunk.port, &chunk.binary) + .await + .map_err(|e| Status::internal(format!("find pending peer failed: {e}")))? + { + return Err(Status::failed_precondition(format!( + "cannot reject approved chunk {}: a pending chunk ({}) already exists for \ + the same destination ({}:{} from {}). Decide that pending chunk first.", + req.chunk_id, peer.id, chunk.host, chunk.port, chunk.binary, + ))); + } + // Another approved chunk for the same key can also coexist post-#1245. + // `remove_chunk_from_policy` strips by rule_name+binary_path, so it + // would also remove the peer's contribution. Refuse and point at it. + if let Some(peer) = state + .store + .find_other_approved_chunk_for_key( + &sandbox_id, + &chunk.host, + chunk.port, + &chunk.binary, + &req.chunk_id, + ) + .await + .map_err(|e| Status::internal(format!("find approved peer failed: {e}")))? + { + return Err(Status::failed_precondition(format!( + "cannot reject approved chunk {}: another approved chunk ({}) also \ + contributes to the rule at {}:{} from {}. Undo or reject that chunk first.", + req.chunk_id, peer.id, chunk.host, chunk.port, chunk.binary, + ))); + } let (version, hash) = remove_chunk_from_policy(state, &sandbox_id, &chunk).await?; emit_gateway_policy_audit_log( &sandbox_id, @@ -1803,6 +1887,65 @@ pub(super) async fn handle_undo_draft_chunk( ))); } + // Another approved chunk for the same (host, port, binary) can coexist + // after issue #1245 (decided chunks release the dedup slot). Since + // `remove_chunk_from_policy` strips by rule_name + binary_path rather than + // chunk identity, undoing this chunk would also strip the rule the other + // approved chunk contributed. Refuse and point the operator at the peer. + if let Some(peer) = state + .store + .find_other_approved_chunk_for_key( + &sandbox_id, + &chunk.host, + chunk.port, + &chunk.binary, + &req.chunk_id, + ) + .await + .map_err(|e| Status::internal(format!("find approved peer failed: {e}")))? + { + return Err(Status::failed_precondition(format!( + "cannot undo approved chunk {}: another approved chunk ({}) also \ + contributes to the rule at {}:{} from {}. Undo or reject that chunk first.", + req.chunk_id, peer.id, chunk.host, chunk.port, chunk.binary, + ))); + } + + // Flip the chunk back to pending before touching the active policy. A + // concurrent denial can land a pending peer at any time, and reverting an + // approved chunk to pending recomputes its dedup_key — the collision must + // be detected on the partial unique index, not via a preflight that races + // the subsequent UPDATE. If the UPDATE succeeds we proceed to mutate + // policy; if it fails with a dedup-slot conflict we surface a clean + // FailedPrecondition without having committed any policy revision (#1245). + if let Err(e) = state + .store + .update_draft_chunk_status(&req.chunk_id, "pending", None) + .await + { + if e.is_unique_violation_on("objects_dedup_uq") { + let peer_descriptor = state + .store + .find_pending_draft_chunk_for_key( + &sandbox_id, + &chunk.host, + chunk.port, + &chunk.binary, + ) + .await + .ok() + .flatten() + .map(|peer| format!(" ({})", peer.id)) + .unwrap_or_default(); + return Err(Status::failed_precondition(format!( + "cannot undo approved chunk {}: a pending chunk{} already exists for \ + the same destination ({}:{} from {}). Decide that pending chunk first.", + req.chunk_id, peer_descriptor, chunk.host, chunk.port, chunk.binary, + ))); + } + return Err(Status::internal(format!("update chunk status failed: {e}"))); + } + info!( sandbox_id = %sandbox_id, chunk_id = %req.chunk_id, @@ -1812,13 +1955,34 @@ pub(super) async fn handle_undo_draft_chunk( "UndoDraftChunk: removing rule from active policy" ); - let (version, hash) = remove_chunk_from_policy(state, &sandbox_id, &chunk).await?; - - state - .store - .update_draft_chunk_status(&req.chunk_id, "pending", None) - .await - .map_err(|e| Status::internal(format!("update chunk status failed: {e}")))?; + let (version, hash) = match remove_chunk_from_policy(state, &sandbox_id, &chunk).await { + Ok(result) => result, + Err(policy_err) => { + // The status flip already committed. If we propagate the error + // without rolling back, the chunk is stuck in `pending` while the + // rule is still live — and a subsequent reject takes the simple + // (was_approved=false) branch that never touches policy, locking + // the rule in place. Best-effort rollback to `approved` with the + // original `decided_at_ms`. Rollback can itself collide on the + // dedup index if a fresh denial raced in between the two writes; + // log loudly in that case so operators can grep for divergence. + if let Err(rollback_err) = state + .store + .update_draft_chunk_status(&req.chunk_id, "approved", chunk.decided_at_ms) + .await + { + error!( + sandbox_id = %sandbox_id, + chunk_id = %req.chunk_id, + policy_err = %policy_err, + rollback_err = %rollback_err, + "UndoDraftChunk: rule removal failed AND status rollback failed — \ + chunk left in pending state with rule still live in policy" + ); + } + return Err(policy_err); + } + }; state.sandbox_watch_bus.notify(&sandbox_id); emit_gateway_policy_audit_log( @@ -4987,4 +5151,2037 @@ mod tests { assert_eq!(err.code(), Code::InvalidArgument); assert!(err.message().contains("unknown setting key")); } + + /// A fresh denial against an existing approved rule (e.g. hostname rule + /// with stale `allowed_ips`) must surface as a new pending chunk so the + /// operator can extend the rule. Approved chunks have `dedup_key = NULL`, + /// so they don't collide with new pending submissions for the same + /// `(host, port, binary)`. See issue #1245. + #[tokio::test] + async fn denial_for_existing_approved_rule_should_surface_pending_chunk() { + use openshell_core::proto::{ + GetDraftPolicyRequest, NetworkBinary, NetworkEndpoint, SandboxPhase, SandboxSpec, + }; + + let state = test_server_state().await; + let sandbox = Sandbox { + metadata: Some(openshell_core::proto::datamodel::v1::ObjectMeta { + id: "sb-stale-allowed-ips".to_string(), + name: "stale-allowed-ips".to_string(), + created_at_ms: 1_000_000, + labels: std::collections::HashMap::new(), + }), + spec: Some(SandboxSpec { + policy: None, + ..Default::default() + }), + phase: SandboxPhase::Ready as i32, + ..Default::default() + }; + state.store.put_message(&sandbox).await.unwrap(); + let sandbox_name = sandbox.object_name().to_string(); + + let proposed_rule = NetworkPolicyRule { + name: "allow_internal_api".to_string(), + endpoints: vec![NetworkEndpoint { + host: "internal-api.example.com".to_string(), + port: 443, + allowed_ips: vec!["10.0.5.10".to_string()], + ..Default::default() + }], + binaries: vec![NetworkBinary { + path: "/usr/bin/curl".to_string(), + ..Default::default() + }], + }; + + // Step 1: first denial → mapper proposes a chunk → operator approves. + let submit = handle_submit_policy_analysis( + &state, + Request::new(SubmitPolicyAnalysisRequest { + name: sandbox_name.clone(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_internal_api".to_string(), + proposed_rule: Some(proposed_rule.clone()), + rationale: "first denial".to_string(), + confidence: 0.9, + hit_count: 1, + first_seen_ms: 100, + last_seen_ms: 100, + binary: "/usr/bin/curl".to_string(), + ..Default::default() + }], + ..Default::default() + }), + ) + .await + .unwrap() + .into_inner(); + assert_eq!(submit.accepted_chunks, 1); + + let pending = handle_get_draft_policy( + &state, + Request::new(GetDraftPolicyRequest { + name: sandbox_name.clone(), + status_filter: "pending".to_string(), + }), + ) + .await + .unwrap() + .into_inner(); + assert_eq!(pending.chunks.len(), 1); + let chunk_id = pending.chunks[0].id.clone(); + + handle_approve_draft_chunk( + &state, + Request::new(ApproveDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: chunk_id.clone(), + }), + ) + .await + .unwrap(); + + // After approve, the rule is live with `allowed_ips: ["10.0.5.10"]`. + // No pending chunks — confirmed baseline. + let pending_after_approve = handle_get_draft_policy( + &state, + Request::new(GetDraftPolicyRequest { + name: sandbox_name.clone(), + status_filter: "pending".to_string(), + }), + ) + .await + .unwrap() + .into_inner(); + assert_eq!(pending_after_approve.chunks.len(), 0); + + // Step 2: time passes, DNS for internal-api.example.com flips to a + // different backend (e.g. 10.0.5.99), the proxy denies the connection + // because the resolved IP is not in `allowed_ips`, and the mechanistic + // mapper generates a fresh proposal for the SAME (host, port, binary). + // The proposal here would carry the *new* allowed_ips it observed. + let stale_proposed_rule = NetworkPolicyRule { + name: "allow_internal_api".to_string(), + endpoints: vec![NetworkEndpoint { + host: "internal-api.example.com".to_string(), + port: 443, + allowed_ips: vec!["10.0.5.99".to_string()], + ..Default::default() + }], + binaries: vec![NetworkBinary { + path: "/usr/bin/curl".to_string(), + ..Default::default() + }], + }; + + handle_submit_policy_analysis( + &state, + Request::new(SubmitPolicyAnalysisRequest { + name: sandbox_name.clone(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_internal_api".to_string(), + proposed_rule: Some(stale_proposed_rule), + rationale: "denial after VPN flip — resolved IP not in allowed_ips".to_string(), + confidence: 0.9, + hit_count: 1, + first_seen_ms: 200, + last_seen_ms: 200, + binary: "/usr/bin/curl".to_string(), + ..Default::default() + }], + ..Default::default() + }), + ) + .await + .unwrap(); + + // Step 3: operator opens the TUI / queries draft policy expecting to + // see something to act on. Today, this assertion fails: zero pending + // chunks — the dedup `ON CONFLICT` only bumped hit_count on the + // already-approved chunk, so the TUI's pending-rule surface stays + // silent and the operator has no in-product signal. + let pending_after_second_denial = handle_get_draft_policy( + &state, + Request::new(GetDraftPolicyRequest { + name: sandbox_name, + status_filter: "pending".to_string(), + }), + ) + .await + .unwrap() + .into_inner(); + assert!( + !pending_after_second_denial.chunks.is_empty(), + "expected a pending chunk to surface the stale-allowed_ips denial, \ + but got zero — operator has no in-product signal" + ); + // The fix isn't just "a chunk surfaces" — the surfaced chunk must + // carry the *new* allowed_ips so the operator can extend the rule. + let surfaced_rule = pending_after_second_denial.chunks[0] + .proposed_rule + .as_ref() + .expect("surfaced chunk must include a proposed rule"); + let surfaced_allowed_ips: Vec<&str> = surfaced_rule.endpoints[0] + .allowed_ips + .iter() + .map(String::as_str) + .collect(); + assert_eq!( + surfaced_allowed_ips, + vec!["10.0.5.99"], + "surfaced pending chunk must reflect the new resolved IP, not the original" + ); + } + + /// Issue #1245 follow-up: once a fresh denial against an existing approved + /// rule surfaces as a pending peer, undoing the approved chunk would + /// recompute the same `dedup_key` and collide on the partial unique index. + /// The handler must reject the undo with `FailedPrecondition` BEFORE + /// touching the active policy, leaving DB and policy state consistent. + #[tokio::test] + async fn undo_approved_chunk_with_competing_pending_peer_returns_failed_precondition() { + use openshell_core::proto::{ + GetDraftPolicyRequest, NetworkBinary, NetworkEndpoint, SandboxPhase, SandboxSpec, + }; + + let state = test_server_state().await; + let sandbox = Sandbox { + metadata: Some(openshell_core::proto::datamodel::v1::ObjectMeta { + id: "sb-undo-collision".to_string(), + name: "undo-collision".to_string(), + created_at_ms: 1_000_000, + labels: std::collections::HashMap::new(), + }), + spec: Some(SandboxSpec { + policy: None, + ..Default::default() + }), + phase: SandboxPhase::Ready as i32, + ..Default::default() + }; + state.store.put_message(&sandbox).await.unwrap(); + let sandbox_name = sandbox.object_name().to_string(); + let sandbox_id = sandbox.object_id().to_string(); + + let make_rule = |ip: &str| NetworkPolicyRule { + name: "allow_internal_api".to_string(), + endpoints: vec![NetworkEndpoint { + host: "internal-api.example.com".to_string(), + port: 443, + allowed_ips: vec![ip.to_string()], + ..Default::default() + }], + binaries: vec![NetworkBinary { + path: "/usr/bin/curl".to_string(), + ..Default::default() + }], + }; + + // Submit + approve the first denial. + handle_submit_policy_analysis( + &state, + Request::new(SubmitPolicyAnalysisRequest { + name: sandbox_name.clone(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_internal_api".to_string(), + proposed_rule: Some(make_rule("10.0.5.10")), + rationale: "first denial".to_string(), + confidence: 0.9, + hit_count: 1, + first_seen_ms: 100, + last_seen_ms: 100, + binary: "/usr/bin/curl".to_string(), + ..Default::default() + }], + ..Default::default() + }), + ) + .await + .unwrap(); + + let approved_chunk_id = handle_get_draft_policy( + &state, + Request::new(GetDraftPolicyRequest { + name: sandbox_name.clone(), + status_filter: "pending".to_string(), + }), + ) + .await + .unwrap() + .into_inner() + .chunks[0] + .id + .clone(); + + handle_approve_draft_chunk( + &state, + Request::new(ApproveDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: approved_chunk_id.clone(), + }), + ) + .await + .unwrap(); + + // A fresh denial creates a pending peer for the same (host, port, binary). + handle_submit_policy_analysis( + &state, + Request::new(SubmitPolicyAnalysisRequest { + name: sandbox_name.clone(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_internal_api".to_string(), + proposed_rule: Some(make_rule("10.0.5.99")), + rationale: "denial after VPN flip".to_string(), + confidence: 0.9, + hit_count: 1, + first_seen_ms: 200, + last_seen_ms: 200, + binary: "/usr/bin/curl".to_string(), + ..Default::default() + }], + ..Default::default() + }), + ) + .await + .unwrap(); + + let pending_after_resubmit = handle_get_draft_policy( + &state, + Request::new(GetDraftPolicyRequest { + name: sandbox_name.clone(), + status_filter: "pending".to_string(), + }), + ) + .await + .unwrap() + .into_inner(); + assert_eq!(pending_after_resubmit.chunks.len(), 1); + let peer_chunk_id = pending_after_resubmit.chunks[0].id.clone(); + assert_ne!(peer_chunk_id, approved_chunk_id); + + let revisions_before_undo = handle_list_sandbox_policies( + &state, + Request::new(ListSandboxPoliciesRequest { + name: sandbox_name.clone(), + limit: 10, + offset: 0, + global: false, + }), + ) + .await + .unwrap() + .into_inner(); + let live_version_before = revisions_before_undo.revisions[0].version; + + // Undo must refuse before mutating policy. + let undo_err = handle_undo_draft_chunk( + &state, + Request::new(UndoDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: approved_chunk_id.clone(), + }), + ) + .await + .unwrap_err(); + assert_eq!(undo_err.code(), Code::FailedPrecondition); + assert!( + undo_err.message().contains(&peer_chunk_id), + "error message should reference the conflicting pending chunk; got: {}", + undo_err.message(), + ); + + // The approved chunk is still approved; the pending peer is still pending. + let approved_after = state + .store + .get_draft_chunk(&approved_chunk_id) + .await + .unwrap() + .unwrap(); + assert_eq!(approved_after.status, "approved"); + let peer_after = state + .store + .get_draft_chunk(&peer_chunk_id) + .await + .unwrap() + .unwrap(); + assert_eq!(peer_after.status, "pending"); + + // No new policy revision was created. + let revisions_after_undo = handle_list_sandbox_policies( + &state, + Request::new(ListSandboxPoliciesRequest { + name: sandbox_name.clone(), + limit: 10, + offset: 0, + global: false, + }), + ) + .await + .unwrap() + .into_inner(); + assert_eq!( + revisions_after_undo.revisions[0].version, live_version_before, + "undo must not produce a new policy revision when it fails on the dedup slot" + ); + let _ = sandbox_id; + } + + /// The recovery path documented in the precondition error message + /// ("decide that pending chunk first") must actually unblock the undo. + /// Without this test, a future regression in `update_draft_chunk_status` + /// or in dedup-key recomputation could silently break operator recovery + /// while the collision test above still passes. + #[tokio::test] + async fn undo_succeeds_after_deciding_the_competing_pending_peer() { + use openshell_core::proto::{ + GetDraftPolicyRequest, NetworkBinary, NetworkEndpoint, RejectDraftChunkRequest, + SandboxPhase, SandboxSpec, + }; + + let state = test_server_state().await; + let sandbox = Sandbox { + metadata: Some(openshell_core::proto::datamodel::v1::ObjectMeta { + id: "sb-undo-recovery".to_string(), + name: "undo-recovery".to_string(), + created_at_ms: 1_000_000, + labels: std::collections::HashMap::new(), + }), + spec: Some(SandboxSpec { + policy: None, + ..Default::default() + }), + phase: SandboxPhase::Ready as i32, + ..Default::default() + }; + state.store.put_message(&sandbox).await.unwrap(); + let sandbox_name = sandbox.object_name().to_string(); + + let make_rule = |ip: &str| NetworkPolicyRule { + name: "allow_internal_api".to_string(), + endpoints: vec![NetworkEndpoint { + host: "internal-api.example.com".to_string(), + port: 443, + allowed_ips: vec![ip.to_string()], + ..Default::default() + }], + binaries: vec![NetworkBinary { + path: "/usr/bin/curl".to_string(), + ..Default::default() + }], + }; + + handle_submit_policy_analysis( + &state, + Request::new(SubmitPolicyAnalysisRequest { + name: sandbox_name.clone(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_internal_api".to_string(), + proposed_rule: Some(make_rule("10.0.5.10")), + rationale: "first denial".to_string(), + confidence: 0.9, + hit_count: 1, + first_seen_ms: 100, + last_seen_ms: 100, + binary: "/usr/bin/curl".to_string(), + ..Default::default() + }], + ..Default::default() + }), + ) + .await + .unwrap(); + let approved_chunk_id = handle_get_draft_policy( + &state, + Request::new(GetDraftPolicyRequest { + name: sandbox_name.clone(), + status_filter: "pending".to_string(), + }), + ) + .await + .unwrap() + .into_inner() + .chunks[0] + .id + .clone(); + handle_approve_draft_chunk( + &state, + Request::new(ApproveDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: approved_chunk_id.clone(), + }), + ) + .await + .unwrap(); + + handle_submit_policy_analysis( + &state, + Request::new(SubmitPolicyAnalysisRequest { + name: sandbox_name.clone(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_internal_api".to_string(), + proposed_rule: Some(make_rule("10.0.5.99")), + rationale: "fresh denial".to_string(), + confidence: 0.9, + hit_count: 1, + first_seen_ms: 200, + last_seen_ms: 200, + binary: "/usr/bin/curl".to_string(), + ..Default::default() + }], + ..Default::default() + }), + ) + .await + .unwrap(); + let peer_chunk_id = handle_get_draft_policy( + &state, + Request::new(GetDraftPolicyRequest { + name: sandbox_name.clone(), + status_filter: "pending".to_string(), + }), + ) + .await + .unwrap() + .into_inner() + .chunks[0] + .id + .clone(); + + // First undo blocks on the peer. + handle_undo_draft_chunk( + &state, + Request::new(UndoDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: approved_chunk_id.clone(), + }), + ) + .await + .unwrap_err(); + + // Operator decides the peer (reject), then retries the undo. + handle_reject_draft_chunk( + &state, + Request::new(RejectDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: peer_chunk_id.clone(), + reason: "superseded by undo".to_string(), + }), + ) + .await + .unwrap(); + + let undo = handle_undo_draft_chunk( + &state, + Request::new(UndoDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: approved_chunk_id.clone(), + }), + ) + .await + .unwrap() + .into_inner(); + assert!(undo.policy_version > 0); + assert!(!undo.policy_hash.is_empty()); + + let approved_after = state + .store + .get_draft_chunk(&approved_chunk_id) + .await + .unwrap() + .unwrap(); + assert_eq!( + approved_after.status, "pending", + "after recovery, the previously approved chunk must be pending" + ); + } + + /// Rejecting an approved chunk while a pending peer exists for the same + /// (host, port, binary) must refuse without touching the live policy. The + /// peer represents the operator's newer signal; reject silently stripping + /// the rule would create a confusing approval loop. + #[tokio::test] + async fn reject_approved_chunk_with_pending_peer_returns_failed_precondition() { + use openshell_core::proto::{ + GetDraftPolicyRequest, NetworkBinary, NetworkEndpoint, RejectDraftChunkRequest, + SandboxPhase, SandboxSpec, + }; + + let state = test_server_state().await; + let sandbox = Sandbox { + metadata: Some(openshell_core::proto::datamodel::v1::ObjectMeta { + id: "sb-reject-peer".to_string(), + name: "reject-peer".to_string(), + created_at_ms: 1_000_000, + labels: std::collections::HashMap::new(), + }), + spec: Some(SandboxSpec { + policy: None, + ..Default::default() + }), + phase: SandboxPhase::Ready as i32, + ..Default::default() + }; + state.store.put_message(&sandbox).await.unwrap(); + let sandbox_name = sandbox.object_name().to_string(); + + let make_rule = |ip: &str| NetworkPolicyRule { + name: "allow_internal_api".to_string(), + endpoints: vec![NetworkEndpoint { + host: "internal-api.example.com".to_string(), + port: 443, + allowed_ips: vec![ip.to_string()], + ..Default::default() + }], + binaries: vec![NetworkBinary { + path: "/usr/bin/curl".to_string(), + ..Default::default() + }], + }; + + handle_submit_policy_analysis( + &state, + Request::new(SubmitPolicyAnalysisRequest { + name: sandbox_name.clone(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_internal_api".to_string(), + proposed_rule: Some(make_rule("10.0.5.10")), + rationale: "first denial".to_string(), + confidence: 0.9, + hit_count: 1, + first_seen_ms: 100, + last_seen_ms: 100, + binary: "/usr/bin/curl".to_string(), + ..Default::default() + }], + ..Default::default() + }), + ) + .await + .unwrap(); + let approved_chunk_id = handle_get_draft_policy( + &state, + Request::new(GetDraftPolicyRequest { + name: sandbox_name.clone(), + status_filter: "pending".to_string(), + }), + ) + .await + .unwrap() + .into_inner() + .chunks[0] + .id + .clone(); + handle_approve_draft_chunk( + &state, + Request::new(ApproveDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: approved_chunk_id.clone(), + }), + ) + .await + .unwrap(); + + handle_submit_policy_analysis( + &state, + Request::new(SubmitPolicyAnalysisRequest { + name: sandbox_name.clone(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_internal_api".to_string(), + proposed_rule: Some(make_rule("10.0.5.99")), + rationale: "fresh denial".to_string(), + confidence: 0.9, + hit_count: 1, + first_seen_ms: 200, + last_seen_ms: 200, + binary: "/usr/bin/curl".to_string(), + ..Default::default() + }], + ..Default::default() + }), + ) + .await + .unwrap(); + let peer_chunk_id = handle_get_draft_policy( + &state, + Request::new(GetDraftPolicyRequest { + name: sandbox_name.clone(), + status_filter: "pending".to_string(), + }), + ) + .await + .unwrap() + .into_inner() + .chunks[0] + .id + .clone(); + + let revisions_before = handle_list_sandbox_policies( + &state, + Request::new(ListSandboxPoliciesRequest { + name: sandbox_name.clone(), + limit: 10, + offset: 0, + global: false, + }), + ) + .await + .unwrap() + .into_inner(); + let version_before = revisions_before.revisions[0].version; + + let reject_err = handle_reject_draft_chunk( + &state, + Request::new(RejectDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: approved_chunk_id.clone(), + reason: "operator wants to revoke".to_string(), + }), + ) + .await + .unwrap_err(); + assert_eq!(reject_err.code(), Code::FailedPrecondition); + assert!(reject_err.message().contains(&peer_chunk_id)); + + let approved_after = state + .store + .get_draft_chunk(&approved_chunk_id) + .await + .unwrap() + .unwrap(); + assert_eq!(approved_after.status, "approved"); + + let revisions_after = handle_list_sandbox_policies( + &state, + Request::new(ListSandboxPoliciesRequest { + name: sandbox_name.clone(), + limit: 10, + offset: 0, + global: false, + }), + ) + .await + .unwrap() + .into_inner(); + assert_eq!(revisions_after.revisions[0].version, version_before); + } + + /// Approving a rejected chunk while a pending peer carries a newer + /// `proposed_rule` would push stale `allowed_ips` into the policy and + /// invite an order-dependent overwrite when the peer is later approved. + /// Refuse instead. + #[tokio::test] + async fn approve_rejected_chunk_with_pending_peer_returns_failed_precondition() { + use openshell_core::proto::{ + GetDraftPolicyRequest, NetworkBinary, NetworkEndpoint, RejectDraftChunkRequest, + SandboxPhase, SandboxSpec, + }; + + let state = test_server_state().await; + let sandbox = Sandbox { + metadata: Some(openshell_core::proto::datamodel::v1::ObjectMeta { + id: "sb-approve-rejected-peer".to_string(), + name: "approve-rejected-peer".to_string(), + created_at_ms: 1_000_000, + labels: std::collections::HashMap::new(), + }), + spec: Some(SandboxSpec { + policy: None, + ..Default::default() + }), + phase: SandboxPhase::Ready as i32, + ..Default::default() + }; + state.store.put_message(&sandbox).await.unwrap(); + let sandbox_name = sandbox.object_name().to_string(); + + let make_rule = |ip: &str| NetworkPolicyRule { + name: "allow_internal_api".to_string(), + endpoints: vec![NetworkEndpoint { + host: "internal-api.example.com".to_string(), + port: 443, + allowed_ips: vec![ip.to_string()], + ..Default::default() + }], + binaries: vec![NetworkBinary { + path: "/usr/bin/curl".to_string(), + ..Default::default() + }], + }; + + // Seed a pending chunk, reject it (releasing its dedup slot). + handle_submit_policy_analysis( + &state, + Request::new(SubmitPolicyAnalysisRequest { + name: sandbox_name.clone(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_internal_api".to_string(), + proposed_rule: Some(make_rule("10.0.5.10")), + rationale: "first denial".to_string(), + confidence: 0.9, + hit_count: 1, + first_seen_ms: 100, + last_seen_ms: 100, + binary: "/usr/bin/curl".to_string(), + ..Default::default() + }], + ..Default::default() + }), + ) + .await + .unwrap(); + let rejected_chunk_id = handle_get_draft_policy( + &state, + Request::new(GetDraftPolicyRequest { + name: sandbox_name.clone(), + status_filter: "pending".to_string(), + }), + ) + .await + .unwrap() + .into_inner() + .chunks[0] + .id + .clone(); + handle_reject_draft_chunk( + &state, + Request::new(RejectDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: rejected_chunk_id.clone(), + reason: "noisy first attempt".to_string(), + }), + ) + .await + .unwrap(); + + // Fresh denial creates a pending peer. + handle_submit_policy_analysis( + &state, + Request::new(SubmitPolicyAnalysisRequest { + name: sandbox_name.clone(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_internal_api".to_string(), + proposed_rule: Some(make_rule("10.0.5.99")), + rationale: "fresh denial".to_string(), + confidence: 0.9, + hit_count: 1, + first_seen_ms: 200, + last_seen_ms: 200, + binary: "/usr/bin/curl".to_string(), + ..Default::default() + }], + ..Default::default() + }), + ) + .await + .unwrap(); + let peer_chunk_id = handle_get_draft_policy( + &state, + Request::new(GetDraftPolicyRequest { + name: sandbox_name.clone(), + status_filter: "pending".to_string(), + }), + ) + .await + .unwrap() + .into_inner() + .chunks[0] + .id + .clone(); + + let approve_err = handle_approve_draft_chunk( + &state, + Request::new(ApproveDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: rejected_chunk_id.clone(), + }), + ) + .await + .unwrap_err(); + assert_eq!(approve_err.code(), Code::FailedPrecondition); + assert!(approve_err.message().contains(&peer_chunk_id)); + + let rejected_after = state + .store + .get_draft_chunk(&rejected_chunk_id) + .await + .unwrap() + .unwrap(); + assert_eq!(rejected_after.status, "rejected"); + } + + /// Codex round-3 finding: `dedup_key=NULL` on decided chunks lets two + /// `approved` rows coexist for the same `(host, port, binary)`. The + /// pending-peer preflight does not detect this case — undo of the older + /// approved chunk would silently strip the rule that the newer approved + /// chunk installed (because `remove_chunk_from_policy` operates by + /// `rule_name` + `binary_path`, not chunk identity). + #[tokio::test] + async fn undo_approved_chunk_with_other_approved_peer_returns_failed_precondition() { + use openshell_core::proto::{ + GetDraftPolicyRequest, NetworkBinary, NetworkEndpoint, SandboxPhase, SandboxSpec, + }; + + let state = test_server_state().await; + let sandbox = Sandbox { + metadata: Some(openshell_core::proto::datamodel::v1::ObjectMeta { + id: "sb-decided-peer".to_string(), + name: "decided-peer".to_string(), + created_at_ms: 1_000_000, + labels: std::collections::HashMap::new(), + }), + spec: Some(SandboxSpec { + policy: None, + ..Default::default() + }), + phase: SandboxPhase::Ready as i32, + ..Default::default() + }; + state.store.put_message(&sandbox).await.unwrap(); + let sandbox_name = sandbox.object_name().to_string(); + let sandbox_id = sandbox.object_id().to_string(); + + let make_rule = |ip: &str| NetworkPolicyRule { + name: "allow_internal_api".to_string(), + endpoints: vec![NetworkEndpoint { + host: "internal-api.example.com".to_string(), + port: 443, + allowed_ips: vec![ip.to_string()], + ..Default::default() + }], + binaries: vec![NetworkBinary { + path: "/usr/bin/curl".to_string(), + ..Default::default() + }], + }; + + // Approve A — rule R installed with 10.0.5.10. + handle_submit_policy_analysis( + &state, + Request::new(SubmitPolicyAnalysisRequest { + name: sandbox_name.clone(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_internal_api".to_string(), + proposed_rule: Some(make_rule("10.0.5.10")), + rationale: "first denial".to_string(), + confidence: 0.9, + hit_count: 1, + first_seen_ms: 100, + last_seen_ms: 100, + binary: "/usr/bin/curl".to_string(), + ..Default::default() + }], + ..Default::default() + }), + ) + .await + .unwrap(); + let first_approved_id = handle_get_draft_policy( + &state, + Request::new(GetDraftPolicyRequest { + name: sandbox_name.clone(), + status_filter: "pending".to_string(), + }), + ) + .await + .unwrap() + .into_inner() + .chunks[0] + .id + .clone(); + handle_approve_draft_chunk( + &state, + Request::new(ApproveDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: first_approved_id.clone(), + }), + ) + .await + .unwrap(); + + // Fresh denial → pending B. + handle_submit_policy_analysis( + &state, + Request::new(SubmitPolicyAnalysisRequest { + name: sandbox_name.clone(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_internal_api".to_string(), + proposed_rule: Some(make_rule("10.0.5.99")), + rationale: "fresh denial".to_string(), + confidence: 0.9, + hit_count: 1, + first_seen_ms: 200, + last_seen_ms: 200, + binary: "/usr/bin/curl".to_string(), + ..Default::default() + }], + ..Default::default() + }), + ) + .await + .unwrap(); + let second_approved_id = handle_get_draft_policy( + &state, + Request::new(GetDraftPolicyRequest { + name: sandbox_name.clone(), + status_filter: "pending".to_string(), + }), + ) + .await + .unwrap() + .into_inner() + .chunks[0] + .id + .clone(); + + // Approve B — this is the #1245 workflow (extend rule). Must succeed. + handle_approve_draft_chunk( + &state, + Request::new(ApproveDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: second_approved_id.clone(), + }), + ) + .await + .unwrap(); + + // Both A and B are now approved; rule R reflects the merged state. + let chunk_a = state + .store + .get_draft_chunk(&first_approved_id) + .await + .unwrap() + .unwrap(); + let chunk_b = state + .store + .get_draft_chunk(&second_approved_id) + .await + .unwrap() + .unwrap(); + assert_eq!(chunk_a.status, "approved"); + assert_eq!(chunk_b.status, "approved"); + + let revisions_before = handle_list_sandbox_policies( + &state, + Request::new(ListSandboxPoliciesRequest { + name: sandbox_name.clone(), + limit: 10, + offset: 0, + global: false, + }), + ) + .await + .unwrap() + .into_inner(); + let version_before = revisions_before.revisions[0].version; + + // Undo A must refuse: undoing it would strip the rule B installed. + let undo_err = handle_undo_draft_chunk( + &state, + Request::new(UndoDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: first_approved_id.clone(), + }), + ) + .await + .unwrap_err(); + assert_eq!(undo_err.code(), Code::FailedPrecondition); + assert!( + undo_err.message().contains(&second_approved_id), + "error must name the conflicting approved chunk; got: {}", + undo_err.message(), + ); + + // A must still be approved; no new policy revision. + let chunk_a_after = state + .store + .get_draft_chunk(&first_approved_id) + .await + .unwrap() + .unwrap(); + assert_eq!(chunk_a_after.status, "approved"); + + let revisions_after = handle_list_sandbox_policies( + &state, + Request::new(ListSandboxPoliciesRequest { + name: sandbox_name.clone(), + limit: 10, + offset: 0, + global: false, + }), + ) + .await + .unwrap() + .into_inner(); + assert_eq!(revisions_after.revisions[0].version, version_before); + let _ = sandbox_id; + } + + /// Sibling of `undo_approved_chunk_with_other_approved_peer_...` for the + /// approve path: approving a *rejected* chunk while another approved + /// chunk owns the same `(host, port, binary)` would push the rejected + /// chunk's (possibly stale) rule body through `merge_chunk_into_policy`, + /// overwriting the peer's contribution with last-writer-wins semantics. + /// The handler must refuse and name the conflicting peer. + #[tokio::test] + async fn approve_rejected_chunk_with_other_approved_peer_returns_failed_precondition() { + use openshell_core::proto::{ + GetDraftPolicyRequest, NetworkBinary, NetworkEndpoint, SandboxPhase, SandboxSpec, + }; + + let state = test_server_state().await; + let sandbox = Sandbox { + metadata: Some(openshell_core::proto::datamodel::v1::ObjectMeta { + id: "sb-approve-peer".to_string(), + name: "approve-peer".to_string(), + created_at_ms: 1_000_000, + labels: std::collections::HashMap::new(), + }), + spec: Some(SandboxSpec { + policy: None, + ..Default::default() + }), + phase: SandboxPhase::Ready as i32, + ..Default::default() + }; + state.store.put_message(&sandbox).await.unwrap(); + let sandbox_name = sandbox.object_name().to_string(); + + let make_rule = |ip: &str| NetworkPolicyRule { + name: "allow_internal_api".to_string(), + endpoints: vec![NetworkEndpoint { + host: "internal-api.example.com".to_string(), + port: 443, + allowed_ips: vec![ip.to_string()], + ..Default::default() + }], + binaries: vec![NetworkBinary { + path: "/usr/bin/curl".to_string(), + ..Default::default() + }], + }; + + // Approve A — rule R installed with 10.0.5.10. + handle_submit_policy_analysis( + &state, + Request::new(SubmitPolicyAnalysisRequest { + name: sandbox_name.clone(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_internal_api".to_string(), + proposed_rule: Some(make_rule("10.0.5.10")), + rationale: "first denial".to_string(), + confidence: 0.9, + hit_count: 1, + first_seen_ms: 100, + last_seen_ms: 100, + binary: "/usr/bin/curl".to_string(), + ..Default::default() + }], + ..Default::default() + }), + ) + .await + .unwrap(); + let approved_id = handle_get_draft_policy( + &state, + Request::new(GetDraftPolicyRequest { + name: sandbox_name.clone(), + status_filter: "pending".to_string(), + }), + ) + .await + .unwrap() + .into_inner() + .chunks[0] + .id + .clone(); + handle_approve_draft_chunk( + &state, + Request::new(ApproveDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: approved_id.clone(), + }), + ) + .await + .unwrap(); + + // Fresh denial → pending B for the same key. + handle_submit_policy_analysis( + &state, + Request::new(SubmitPolicyAnalysisRequest { + name: sandbox_name.clone(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_internal_api".to_string(), + proposed_rule: Some(make_rule("10.0.5.99")), + rationale: "fresh denial".to_string(), + confidence: 0.9, + hit_count: 1, + first_seen_ms: 200, + last_seen_ms: 200, + binary: "/usr/bin/curl".to_string(), + ..Default::default() + }], + ..Default::default() + }), + ) + .await + .unwrap(); + let rejected_id = handle_get_draft_policy( + &state, + Request::new(GetDraftPolicyRequest { + name: sandbox_name.clone(), + status_filter: "pending".to_string(), + }), + ) + .await + .unwrap() + .into_inner() + .chunks[0] + .id + .clone(); + + // Reject B while still pending — this skips the approved-peer check + // (was_approved == false) and leaves B in `rejected` state while A + // is still approved. + handle_reject_draft_chunk( + &state, + Request::new(RejectDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: rejected_id.clone(), + reason: "discarded".to_string(), + }), + ) + .await + .unwrap(); + let rejected_b = state + .store + .get_draft_chunk(&rejected_id) + .await + .unwrap() + .unwrap(); + assert_eq!(rejected_b.status, "rejected"); + + let revisions_before = handle_list_sandbox_policies( + &state, + Request::new(ListSandboxPoliciesRequest { + name: sandbox_name.clone(), + limit: 10, + offset: 0, + global: false, + }), + ) + .await + .unwrap() + .into_inner(); + let version_before = revisions_before.revisions[0].version; + + // Re-approve B (currently rejected) — must refuse: A's approved rule + // owns this key, and merging B would silently overwrite it. + let approve_err = handle_approve_draft_chunk( + &state, + Request::new(ApproveDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: rejected_id.clone(), + }), + ) + .await + .unwrap_err(); + assert_eq!(approve_err.code(), Code::FailedPrecondition); + assert!( + approve_err.message().contains(&approved_id), + "error must name the conflicting approved chunk; got: {}", + approve_err.message(), + ); + + // B must still be rejected; no new policy revision. + let rejected_after = state + .store + .get_draft_chunk(&rejected_id) + .await + .unwrap() + .unwrap(); + assert_eq!(rejected_after.status, "rejected"); + + let revisions_after = handle_list_sandbox_policies( + &state, + Request::new(ListSandboxPoliciesRequest { + name: sandbox_name.clone(), + limit: 10, + offset: 0, + global: false, + }), + ) + .await + .unwrap() + .into_inner(); + assert_eq!(revisions_after.revisions[0].version, version_before); + } + + /// Sibling of `undo_approved_chunk_with_other_approved_peer_...` for the + /// reject path: rejecting an approved chunk while another approved chunk + /// also contributes to the rule would call `remove_chunk_from_policy` + /// (which strips by `rule_name` + `binary_path`), stripping the peer's + /// contribution too. The handler must refuse and name the peer. + #[tokio::test] + async fn reject_approved_chunk_with_other_approved_peer_returns_failed_precondition() { + use openshell_core::proto::{ + GetDraftPolicyRequest, NetworkBinary, NetworkEndpoint, SandboxPhase, SandboxSpec, + }; + + let state = test_server_state().await; + let sandbox = Sandbox { + metadata: Some(openshell_core::proto::datamodel::v1::ObjectMeta { + id: "sb-reject-peer".to_string(), + name: "reject-peer".to_string(), + created_at_ms: 1_000_000, + labels: std::collections::HashMap::new(), + }), + spec: Some(SandboxSpec { + policy: None, + ..Default::default() + }), + phase: SandboxPhase::Ready as i32, + ..Default::default() + }; + state.store.put_message(&sandbox).await.unwrap(); + let sandbox_name = sandbox.object_name().to_string(); + + let make_rule = |ip: &str| NetworkPolicyRule { + name: "allow_internal_api".to_string(), + endpoints: vec![NetworkEndpoint { + host: "internal-api.example.com".to_string(), + port: 443, + allowed_ips: vec![ip.to_string()], + ..Default::default() + }], + binaries: vec![NetworkBinary { + path: "/usr/bin/curl".to_string(), + ..Default::default() + }], + }; + + // Approve A. + handle_submit_policy_analysis( + &state, + Request::new(SubmitPolicyAnalysisRequest { + name: sandbox_name.clone(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_internal_api".to_string(), + proposed_rule: Some(make_rule("10.0.5.10")), + rationale: "first denial".to_string(), + confidence: 0.9, + hit_count: 1, + first_seen_ms: 100, + last_seen_ms: 100, + binary: "/usr/bin/curl".to_string(), + ..Default::default() + }], + ..Default::default() + }), + ) + .await + .unwrap(); + let first_approved_id = handle_get_draft_policy( + &state, + Request::new(GetDraftPolicyRequest { + name: sandbox_name.clone(), + status_filter: "pending".to_string(), + }), + ) + .await + .unwrap() + .into_inner() + .chunks[0] + .id + .clone(); + handle_approve_draft_chunk( + &state, + Request::new(ApproveDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: first_approved_id.clone(), + }), + ) + .await + .unwrap(); + + // Fresh denial → pending B. Approve B (rule extended; both approved). + handle_submit_policy_analysis( + &state, + Request::new(SubmitPolicyAnalysisRequest { + name: sandbox_name.clone(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_internal_api".to_string(), + proposed_rule: Some(make_rule("10.0.5.99")), + rationale: "fresh denial".to_string(), + confidence: 0.9, + hit_count: 1, + first_seen_ms: 200, + last_seen_ms: 200, + binary: "/usr/bin/curl".to_string(), + ..Default::default() + }], + ..Default::default() + }), + ) + .await + .unwrap(); + let second_approved_id = handle_get_draft_policy( + &state, + Request::new(GetDraftPolicyRequest { + name: sandbox_name.clone(), + status_filter: "pending".to_string(), + }), + ) + .await + .unwrap() + .into_inner() + .chunks[0] + .id + .clone(); + handle_approve_draft_chunk( + &state, + Request::new(ApproveDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: second_approved_id.clone(), + }), + ) + .await + .unwrap(); + + let revisions_before = handle_list_sandbox_policies( + &state, + Request::new(ListSandboxPoliciesRequest { + name: sandbox_name.clone(), + limit: 10, + offset: 0, + global: false, + }), + ) + .await + .unwrap() + .into_inner(); + let version_before = revisions_before.revisions[0].version; + + // Reject A must refuse: rejecting it calls `remove_chunk_from_policy`, + // which strips by rule_name + binary_path and would remove B's + // contribution too. + let reject_err = handle_reject_draft_chunk( + &state, + Request::new(RejectDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: first_approved_id.clone(), + reason: "operator change of mind".to_string(), + }), + ) + .await + .unwrap_err(); + assert_eq!(reject_err.code(), Code::FailedPrecondition); + assert!( + reject_err.message().contains(&second_approved_id), + "error must name the conflicting approved chunk; got: {}", + reject_err.message(), + ); + + // A must still be approved; no new policy revision. + let chunk_a_after = state + .store + .get_draft_chunk(&first_approved_id) + .await + .unwrap() + .unwrap(); + assert_eq!(chunk_a_after.status, "approved"); + + let revisions_after = handle_list_sandbox_policies( + &state, + Request::new(ListSandboxPoliciesRequest { + name: sandbox_name.clone(), + limit: 10, + offset: 0, + global: false, + }), + ) + .await + .unwrap() + .into_inner(); + assert_eq!(revisions_after.revisions[0].version, version_before); + } + + /// Round-3 finding: undo flips the chunk's status to `pending` BEFORE + /// calling `remove_chunk_from_policy`, so that a dedup-slot collision can + /// be detected on the partial unique index. If the policy mutation then + /// fails (DB error, version-retry exhaustion, validation), the chunk is + /// left in `pending` state while the rule is still live — and the + /// operator's natural recovery move (reject the pending chunk) hits the + /// `was_approved == false` branch that never calls + /// `remove_chunk_from_policy`, locking the rule in place permanently. + /// The handler must compensate by rolling the status back to `approved` + /// on remove failure. + #[tokio::test] + async fn undo_rolls_back_status_when_remove_chunk_from_policy_fails() { + use crate::persistence::Store; + use openshell_core::proto::{ + GetDraftPolicyRequest, NetworkBinary, NetworkEndpoint, SandboxPhase, SandboxSpec, + }; + + let state = test_server_state().await; + let sandbox = Sandbox { + metadata: Some(openshell_core::proto::datamodel::v1::ObjectMeta { + id: "sb-orphan-rule".to_string(), + name: "orphan-rule".to_string(), + created_at_ms: 1_000_000, + labels: std::collections::HashMap::new(), + }), + spec: Some(SandboxSpec { + policy: None, + ..Default::default() + }), + phase: SandboxPhase::Ready as i32, + ..Default::default() + }; + state.store.put_message(&sandbox).await.unwrap(); + let sandbox_name = sandbox.object_name().to_string(); + + let rule = NetworkPolicyRule { + name: "allow_internal_api".to_string(), + endpoints: vec![NetworkEndpoint { + host: "internal-api.example.com".to_string(), + port: 443, + allowed_ips: vec!["10.0.5.10".to_string()], + ..Default::default() + }], + binaries: vec![NetworkBinary { + path: "/usr/bin/curl".to_string(), + ..Default::default() + }], + }; + + handle_submit_policy_analysis( + &state, + Request::new(SubmitPolicyAnalysisRequest { + name: sandbox_name.clone(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_internal_api".to_string(), + proposed_rule: Some(rule.clone()), + rationale: "first denial".to_string(), + confidence: 0.9, + hit_count: 1, + first_seen_ms: 100, + last_seen_ms: 100, + binary: "/usr/bin/curl".to_string(), + ..Default::default() + }], + ..Default::default() + }), + ) + .await + .unwrap(); + let chunk_id = handle_get_draft_policy( + &state, + Request::new(GetDraftPolicyRequest { + name: sandbox_name.clone(), + status_filter: "pending".to_string(), + }), + ) + .await + .unwrap() + .into_inner() + .chunks[0] + .id + .clone(); + let approve_response = handle_approve_draft_chunk( + &state, + Request::new(ApproveDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: chunk_id.clone(), + }), + ) + .await + .unwrap() + .into_inner(); + let version_after_approve = approve_response.policy_version; + let hash_after_approve = approve_response.policy_hash.clone(); + + let chunk_before = state + .store + .get_draft_chunk(&chunk_id) + .await + .unwrap() + .unwrap(); + let decided_at_before = chunk_before.decided_at_ms; + assert!(decided_at_before.is_some()); + + // Inject a failure into the next sandbox_policy revision insert. + // The approve above already wrote version 1; the undo's + // `remove_chunk_from_policy` will try to write version 2 and trip + // the trigger. The trigger raises a non-unique-violation error, so + // `apply_merge_operations_with_retry` returns Status::internal on + // its first attempt without entering the retry loop. + let sqlite_pool = match &*state.store { + Store::Sqlite(s) => s.pool().clone(), + Store::Postgres(_) => panic!("test requires sqlite-backed store"), + }; + sqlx::query( + r#" +CREATE TRIGGER fail_next_policy_revision +BEFORE INSERT ON "objects" +WHEN NEW.object_type = 'sandbox_policy' +BEGIN + SELECT RAISE(ABORT, 'simulated DB failure for orphan-rule test'); +END +"#, + ) + .execute(&sqlite_pool) + .await + .unwrap(); + + let undo_err = handle_undo_draft_chunk( + &state, + Request::new(UndoDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: chunk_id.clone(), + }), + ) + .await + .unwrap_err(); + // The error surface isn't the focus — any non-OK is acceptable. + // The important assertion is the post-failure state below. + let _ = undo_err; + + // The chunk must NOT be left in `pending`. The rollback should + // restore `approved` with the original decided_at_ms preserved, so + // that subsequent operator actions (retry undo, reject, approve) + // behave the same as if the undo had never been attempted. + let chunk_after = state + .store + .get_draft_chunk(&chunk_id) + .await + .unwrap() + .unwrap(); + assert_eq!( + chunk_after.status, "approved", + "chunk must be rolled back to approved when remove_chunk_from_policy fails — \ + otherwise the rule is orphaned (live in policy, no approved chunk record)" + ); + assert_eq!( + chunk_after.decided_at_ms, decided_at_before, + "rollback must preserve the original decided_at_ms", + ); + + // The live policy must also be unchanged — no version 2 row written, + // and the approved revision's hash must still match what approve + // produced. Otherwise the rule body could drift even when the chunk + // status rolls back correctly. + let revisions_after = handle_list_sandbox_policies( + &state, + Request::new(ListSandboxPoliciesRequest { + name: sandbox_name.clone(), + limit: 10, + offset: 0, + global: false, + }), + ) + .await + .unwrap() + .into_inner(); + assert_eq!( + revisions_after.revisions.len(), + 1, + "no new policy revision should have been created when remove_chunk_from_policy failed" + ); + assert_eq!(revisions_after.revisions[0].version, version_after_approve); + assert_eq!(revisions_after.revisions[0].policy_hash, hash_after_approve); + + // Tear down the trigger so cleanup paths (if any) work. + sqlx::query("DROP TRIGGER fail_next_policy_revision") + .execute(&sqlite_pool) + .await + .unwrap(); + } + + /// Repeat denials with the same `(host, port, binary)` while a pending + /// chunk already exists must dedup — the existing pending row's `hit_count` + /// should accumulate rather than producing a fresh row each flush. + #[tokio::test] + async fn repeat_pending_denials_still_dedup() { + use openshell_core::proto::{ + GetDraftPolicyRequest, NetworkBinary, NetworkEndpoint, SandboxPhase, SandboxSpec, + }; + + let state = test_server_state().await; + let sandbox = Sandbox { + metadata: Some(openshell_core::proto::datamodel::v1::ObjectMeta { + id: "sb-dedup-pending".to_string(), + name: "dedup-pending".to_string(), + created_at_ms: 1_000_000, + labels: std::collections::HashMap::new(), + }), + spec: Some(SandboxSpec { + policy: None, + ..Default::default() + }), + phase: SandboxPhase::Ready as i32, + ..Default::default() + }; + state.store.put_message(&sandbox).await.unwrap(); + let sandbox_name = sandbox.object_name().to_string(); + + let proposed_rule = NetworkPolicyRule { + name: "allow_api".to_string(), + endpoints: vec![NetworkEndpoint { + host: "api.example.com".to_string(), + port: 443, + ..Default::default() + }], + binaries: vec![NetworkBinary { + path: "/usr/bin/curl".to_string(), + ..Default::default() + }], + }; + + for hit in 1..=3 { + handle_submit_policy_analysis( + &state, + Request::new(SubmitPolicyAnalysisRequest { + name: sandbox_name.clone(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_api".to_string(), + proposed_rule: Some(proposed_rule.clone()), + rationale: format!("denial #{hit}"), + confidence: 0.9, + hit_count: 1, + first_seen_ms: 100, + last_seen_ms: 100 + i64::from(hit), + binary: "/usr/bin/curl".to_string(), + ..Default::default() + }], + ..Default::default() + }), + ) + .await + .unwrap(); + } + + let pending = handle_get_draft_policy( + &state, + Request::new(GetDraftPolicyRequest { + name: sandbox_name, + status_filter: "pending".to_string(), + }), + ) + .await + .unwrap() + .into_inner(); + assert_eq!( + pending.chunks.len(), + 1, + "repeat pending denials should merge into a single pending chunk", + ); + assert_eq!(pending.chunks[0].hit_count, 3); + } + + /// Rejected chunks must release their dedup slot so a future denial for + /// the same `(host, port, binary)` can surface as a new pending chunk. + #[tokio::test] + async fn rejected_chunk_does_not_block_new_pending() { + use openshell_core::proto::{ + GetDraftPolicyRequest, NetworkBinary, NetworkEndpoint, SandboxPhase, SandboxSpec, + }; + + let state = test_server_state().await; + let sandbox = Sandbox { + metadata: Some(openshell_core::proto::datamodel::v1::ObjectMeta { + id: "sb-rejected-release".to_string(), + name: "rejected-release".to_string(), + created_at_ms: 1_000_000, + labels: std::collections::HashMap::new(), + }), + spec: Some(SandboxSpec { + policy: None, + ..Default::default() + }), + phase: SandboxPhase::Ready as i32, + ..Default::default() + }; + state.store.put_message(&sandbox).await.unwrap(); + let sandbox_name = sandbox.object_name().to_string(); + + let proposed_rule = NetworkPolicyRule { + name: "allow_api".to_string(), + endpoints: vec![NetworkEndpoint { + host: "api.example.com".to_string(), + port: 443, + ..Default::default() + }], + binaries: vec![NetworkBinary { + path: "/usr/bin/curl".to_string(), + ..Default::default() + }], + }; + + handle_submit_policy_analysis( + &state, + Request::new(SubmitPolicyAnalysisRequest { + name: sandbox_name.clone(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_api".to_string(), + proposed_rule: Some(proposed_rule.clone()), + rationale: "first denial".to_string(), + confidence: 0.9, + hit_count: 1, + first_seen_ms: 100, + last_seen_ms: 100, + binary: "/usr/bin/curl".to_string(), + ..Default::default() + }], + ..Default::default() + }), + ) + .await + .unwrap(); + + let pending = handle_get_draft_policy( + &state, + Request::new(GetDraftPolicyRequest { + name: sandbox_name.clone(), + status_filter: "pending".to_string(), + }), + ) + .await + .unwrap() + .into_inner(); + let chunk_id = pending.chunks[0].id.clone(); + + handle_reject_draft_chunk( + &state, + Request::new(RejectDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id, + reason: "operator rejected".to_string(), + }), + ) + .await + .unwrap(); + + // A new denial for the same key after rejection should surface a + // fresh pending chunk — the rejected row no longer holds the slot. + handle_submit_policy_analysis( + &state, + Request::new(SubmitPolicyAnalysisRequest { + name: sandbox_name.clone(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_api".to_string(), + proposed_rule: Some(proposed_rule), + rationale: "denial after rejection".to_string(), + confidence: 0.9, + hit_count: 1, + first_seen_ms: 200, + last_seen_ms: 200, + binary: "/usr/bin/curl".to_string(), + ..Default::default() + }], + ..Default::default() + }), + ) + .await + .unwrap(); + + let pending_after = handle_get_draft_policy( + &state, + Request::new(GetDraftPolicyRequest { + name: sandbox_name, + status_filter: "pending".to_string(), + }), + ) + .await + .unwrap() + .into_inner(); + assert_eq!( + pending_after.chunks.len(), + 1, + "rejected chunk should release its dedup slot for new pending chunks", + ); + } + + /// Walk through the issue #1245 scenario end-to-end through a real tonic + /// server bound to a local TCP port. Manual demo; not part of the default + /// test run because it binds a real socket and emits println! narrative. + /// Run with: + /// + /// `cargo test -p openshell-server --lib demo_stale_allowed_ips_via_grpc -- --ignored --nocapture` + /// + /// To watch the bug live, `git stash push -- crates/openshell-server/src/persistence/*.rs + /// crates/openshell-server/migrations` and rerun: the final assertion + /// fails and the narrative shows pending count = 0 after the stale-IP + /// denial. `git stash pop` restores the fix. + #[tokio::test] + #[ignore = "manual demo for issue #1245; run with --ignored --nocapture"] + async fn demo_stale_allowed_ips_via_grpc() { + use crate::OpenShellService; + use openshell_core::proto::open_shell_client::OpenShellClient; + use openshell_core::proto::open_shell_server::OpenShellServer; + use openshell_core::proto::{ + ApproveDraftChunkRequest, GetDraftPolicyRequest, NetworkBinary, NetworkEndpoint, + SandboxPhase, SandboxSpec, SubmitPolicyAnalysisRequest, + }; + use tokio::net::TcpListener; + use tokio_stream::wrappers::TcpListenerStream; + + let state = test_server_state().await; + let sandbox = Sandbox { + metadata: Some(openshell_core::proto::datamodel::v1::ObjectMeta { + id: "sb-demo-1245".to_string(), + name: "demo-1245".to_string(), + created_at_ms: 1_000_000, + labels: std::collections::HashMap::new(), + }), + spec: Some(SandboxSpec { + policy: None, + ..Default::default() + }), + phase: SandboxPhase::Ready as i32, + ..Default::default() + }; + state.store.put_message(&sandbox).await.unwrap(); + let sandbox_name = sandbox.object_name().to_string(); + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let server_handle = tokio::spawn(async move { + tonic::transport::Server::builder() + .add_service(OpenShellServer::new(OpenShellService::new(state))) + .serve_with_incoming(TcpListenerStream::new(listener)) + .await + .unwrap(); + }); + + let endpoint = format!("http://{addr}"); + let mut client = OpenShellClient::connect(endpoint.clone()).await.unwrap(); + + println!(); + println!("=== Demo: stale allowed_ips fix (issue #1245) ==="); + println!("gateway listening on {addr}"); + println!("sandbox: {sandbox_name}"); + println!(); + + let make_rule = |ip: &str| NetworkPolicyRule { + name: "allow_internal_api".to_string(), + endpoints: vec![NetworkEndpoint { + host: "internal-api.example.com".to_string(), + port: 443, + allowed_ips: vec![ip.to_string()], + ..Default::default() + }], + binaries: vec![NetworkBinary { + path: "/usr/bin/curl".to_string(), + ..Default::default() + }], + }; + + let pending_filter = || GetDraftPolicyRequest { + name: sandbox_name.clone(), + status_filter: "pending".to_string(), + }; + + println!("[1] supervisor reports first denial (resolved IP 10.0.5.10)"); + let resp = client + .submit_policy_analysis(SubmitPolicyAnalysisRequest { + name: sandbox_name.clone(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_internal_api".to_string(), + proposed_rule: Some(make_rule("10.0.5.10")), + rationale: "first denial".to_string(), + confidence: 0.9, + hit_count: 1, + first_seen_ms: 100, + last_seen_ms: 100, + binary: "/usr/bin/curl".to_string(), + ..Default::default() + }], + ..Default::default() + }) + .await + .unwrap() + .into_inner(); + println!( + " accepted={} rejected={}", + resp.accepted_chunks, resp.rejected_chunks + ); + + let pending = client + .get_draft_policy(pending_filter()) + .await + .unwrap() + .into_inner(); + println!(" pending chunks: {}", pending.chunks.len()); + let chunk_id = pending.chunks[0].id.clone(); + println!(); + + println!("[2] operator approves chunk {chunk_id}"); + client + .approve_draft_chunk(ApproveDraftChunkRequest { + name: sandbox_name.clone(), + chunk_id: chunk_id.clone(), + }) + .await + .unwrap(); + let pending = client + .get_draft_policy(pending_filter()) + .await + .unwrap() + .into_inner(); + println!( + " pending chunks: {} (rule is now live)", + pending.chunks.len() + ); + println!(); + + println!("[3] backend flips to 10.0.5.99 — supervisor reports fresh denial"); + println!(" same (host, port, binary) but allowed_ips=[10.0.5.99]"); + client + .submit_policy_analysis(SubmitPolicyAnalysisRequest { + name: sandbox_name.clone(), + proposed_chunks: vec![PolicyChunk { + rule_name: "allow_internal_api".to_string(), + proposed_rule: Some(make_rule("10.0.5.99")), + rationale: "denial after backend flip".to_string(), + confidence: 0.9, + hit_count: 1, + first_seen_ms: 200, + last_seen_ms: 200, + binary: "/usr/bin/curl".to_string(), + ..Default::default() + }], + ..Default::default() + }) + .await + .unwrap(); + + let pending = client + .get_draft_policy(pending_filter()) + .await + .unwrap() + .into_inner(); + println!(" pending chunks: {}", pending.chunks.len()); + println!(); + if pending.chunks.is_empty() { + println!( + "BUG REPRODUCED: stale-IP denial was absorbed by the approved chunk's \ + dedup slot. Operator has no in-product signal." + ); + } else { + println!( + "FIX VERIFIED: stale-IP denial surfaced as a fresh pending chunk \ + (rationale: {:?})", + pending.chunks[0].rationale, + ); + } + println!(); + + server_handle.abort(); + + assert!( + !pending.chunks.is_empty(), + "expected the stale-IP denial to surface a pending chunk after the fix", + ); + } } diff --git a/crates/openshell-server/src/persistence/postgres.rs b/crates/openshell-server/src/persistence/postgres.rs index 2cd6a046f..a77768524 100644 --- a/crates/openshell-server/src/persistence/postgres.rs +++ b/crates/openshell-server/src/persistence/postgres.rs @@ -440,6 +440,68 @@ WHERE object_type = $1 AND id = $2 row.map(row_to_draft_chunk_record).transpose() } + pub async fn find_pending_draft_chunk_for_key( + &self, + sandbox_id: &str, + host: &str, + port: i32, + binary: &str, + ) -> PersistenceResult> { + let dedup_key = draft_chunk_dedup_key_for_status("pending", host, port, binary); + let row = sqlx::query( + r" +SELECT id, scope, status, hit_count, payload, created_at_ms, updated_at_ms +FROM objects +WHERE object_type = $1 AND scope = $2 AND status = 'pending' AND dedup_key = $3 +", + ) + .bind(DRAFT_CHUNK_OBJECT_TYPE) + .bind(sandbox_id) + .bind(dedup_key) + .fetch_optional(&self.pool) + .await + .map_err(|e| map_db_error(&e))?; + + row.map(row_to_draft_chunk_record).transpose() + } + + /// Find any other approved draft chunk for `(sandbox_id, host, port, binary)` + /// excluding the chunk identified by `exclude_chunk_id`. Approved chunks + /// have `dedup_key=NULL` (issue #1245), so the partial unique index does + /// not constrain them — multiple approved chunks can coexist for the same + /// key. Callers that intend to mutate a rule contributed to by one chunk + /// use this to detect when another decided chunk is also contributing. + pub async fn find_other_approved_chunk_for_key( + &self, + sandbox_id: &str, + host: &str, + port: i32, + binary: &str, + exclude_chunk_id: &str, + ) -> PersistenceResult> { + let rows = sqlx::query( + r" +SELECT id, scope, status, hit_count, payload, created_at_ms, updated_at_ms +FROM objects +WHERE object_type = $1 AND scope = $2 AND status = 'approved' AND id != $3 +", + ) + .bind(DRAFT_CHUNK_OBJECT_TYPE) + .bind(sandbox_id) + .bind(exclude_chunk_id) + .fetch_all(&self.pool) + .await + .map_err(|e| map_db_error(&e))?; + + for row in rows { + let record = row_to_draft_chunk_record(row)?; + if record.host == host && record.port == port && record.binary == binary { + return Ok(Some(record)); + } + } + Ok(None) + } + pub async fn list_draft_chunks( &self, sandbox_id: &str, @@ -492,11 +554,12 @@ ORDER BY created_at_ms DESC record.decided_at_ms = decided_at_ms; record.last_seen_ms = current_time_ms()?; let payload = draft_chunk_payload_from_record(&record)?; + let dedup_key = draft_chunk_dedup_key(&record); let result = sqlx::query( r" UPDATE objects -SET status = $3, payload = $4, updated_at_ms = $5 +SET status = $3, payload = $4, updated_at_ms = $5, dedup_key = $6 WHERE object_type = $1 AND id = $2 ", ) @@ -505,6 +568,7 @@ WHERE object_type = $1 AND id = $2 .bind(status) .bind(payload) .bind(record.last_seen_ms) + .bind(dedup_key) .execute(&self.pool) .await .map_err(|e| map_db_error(&e))?; @@ -597,8 +661,20 @@ WHERE object_type = $1 AND scope = $2 } } -fn draft_chunk_dedup_key(chunk: &DraftChunkRecord) -> String { - format!("{}|{}|{}", chunk.host, chunk.port, chunk.binary) +fn draft_chunk_dedup_key(chunk: &DraftChunkRecord) -> Option { + draft_chunk_dedup_key_for_status(&chunk.status, &chunk.host, chunk.port, &chunk.binary) +} + +fn draft_chunk_dedup_key_for_status( + status: &str, + host: &str, + port: i32, + binary: &str, +) -> Option { + // Only pending chunks participate in dedup. Approved and rejected chunks + // get NULL so they don't absorb future submissions for the same + // (host, port, binary) — see issue #1245. + (status == "pending").then(|| format!("{host}|{port}|{binary}")) } fn row_to_object_record(row: sqlx::postgres::PgRow) -> ObjectRecord { diff --git a/crates/openshell-server/src/persistence/sqlite.rs b/crates/openshell-server/src/persistence/sqlite.rs index fafb07597..f6d9f2aba 100644 --- a/crates/openshell-server/src/persistence/sqlite.rs +++ b/crates/openshell-server/src/persistence/sqlite.rs @@ -24,6 +24,13 @@ pub struct SqliteStore { } impl SqliteStore { + /// Test-only accessor for raw pool access (e.g., installing failure + /// triggers to drive fault-injection tests in sibling modules). + #[cfg(test)] + pub(crate) fn pool(&self) -> &SqlitePool { + &self.pool + } + pub async fn connect(url: &str) -> PersistenceResult { let is_in_memory = url.contains(":memory:") || url.contains("mode=memory"); let max_connections = if is_in_memory { 1 } else { 5 }; @@ -455,6 +462,68 @@ WHERE "object_type" = ?1 AND "id" = ?2 row.map(row_to_draft_chunk_record).transpose() } + pub async fn find_pending_draft_chunk_for_key( + &self, + sandbox_id: &str, + host: &str, + port: i32, + binary: &str, + ) -> PersistenceResult> { + let dedup_key = draft_chunk_dedup_key_for_status("pending", host, port, binary); + let row = sqlx::query( + r#" +SELECT "id", "scope", "status", "hit_count", "payload", "created_at_ms", "updated_at_ms" +FROM "objects" +WHERE "object_type" = ?1 AND "scope" = ?2 AND "status" = 'pending' AND "dedup_key" = ?3 +"#, + ) + .bind(DRAFT_CHUNK_OBJECT_TYPE) + .bind(sandbox_id) + .bind(dedup_key) + .fetch_optional(&self.pool) + .await + .map_err(|e| map_db_error(&e))?; + + row.map(row_to_draft_chunk_record).transpose() + } + + /// Find any other approved draft chunk for `(sandbox_id, host, port, binary)` + /// excluding the chunk identified by `exclude_chunk_id`. Approved chunks + /// have `dedup_key=NULL` (issue #1245), so the partial unique index does + /// not constrain them — multiple approved chunks can coexist for the same + /// key. Callers that intend to mutate a rule contributed to by one chunk + /// use this to detect when another decided chunk is also contributing. + pub async fn find_other_approved_chunk_for_key( + &self, + sandbox_id: &str, + host: &str, + port: i32, + binary: &str, + exclude_chunk_id: &str, + ) -> PersistenceResult> { + let rows = sqlx::query( + r#" +SELECT "id", "scope", "status", "hit_count", "payload", "created_at_ms", "updated_at_ms" +FROM "objects" +WHERE "object_type" = ?1 AND "scope" = ?2 AND "status" = 'approved' AND "id" != ?3 +"#, + ) + .bind(DRAFT_CHUNK_OBJECT_TYPE) + .bind(sandbox_id) + .bind(exclude_chunk_id) + .fetch_all(&self.pool) + .await + .map_err(|e| map_db_error(&e))?; + + for row in rows { + let record = row_to_draft_chunk_record(row)?; + if record.host == host && record.port == port && record.binary == binary { + return Ok(Some(record)); + } + } + Ok(None) + } + pub async fn list_draft_chunks( &self, sandbox_id: &str, @@ -507,11 +576,12 @@ ORDER BY "created_at_ms" DESC record.decided_at_ms = decided_at_ms; record.last_seen_ms = current_time_ms()?; let payload = draft_chunk_payload_from_record(&record)?; + let dedup_key = draft_chunk_dedup_key(&record); let result = sqlx::query( r#" UPDATE "objects" -SET "status" = ?3, "payload" = ?4, "updated_at_ms" = ?5 +SET "status" = ?3, "payload" = ?4, "updated_at_ms" = ?5, "dedup_key" = ?6 WHERE "object_type" = ?1 AND "id" = ?2 "#, ) @@ -520,6 +590,7 @@ WHERE "object_type" = ?1 AND "id" = ?2 .bind(status) .bind(payload) .bind(record.last_seen_ms) + .bind(dedup_key) .execute(&self.pool) .await .map_err(|e| map_db_error(&e))?; @@ -612,8 +683,20 @@ WHERE "object_type" = ?1 AND "scope" = ?2 } } -fn draft_chunk_dedup_key(chunk: &DraftChunkRecord) -> String { - format!("{}|{}|{}", chunk.host, chunk.port, chunk.binary) +fn draft_chunk_dedup_key(chunk: &DraftChunkRecord) -> Option { + draft_chunk_dedup_key_for_status(&chunk.status, &chunk.host, chunk.port, &chunk.binary) +} + +fn draft_chunk_dedup_key_for_status( + status: &str, + host: &str, + port: i32, + binary: &str, +) -> Option { + // Only pending chunks participate in dedup. Approved and rejected chunks + // get NULL so they don't absorb future submissions for the same + // (host, port, binary) — see issue #1245. + (status == "pending").then(|| format!("{host}|{port}|{binary}")) } fn row_to_object_record(row: sqlx::sqlite::SqliteRow) -> ObjectRecord { @@ -656,3 +739,280 @@ fn row_to_draft_chunk_record(row: sqlx::sqlite::SqliteRow) -> PersistenceResult< updated_at_ms, ) } + +#[cfg(test)] +mod tests { + use super::*; + + async fn fetch_dedup_key(store: &SqliteStore, id: &str) -> Option { + sqlx::query_scalar(r#"SELECT "dedup_key" FROM "objects" WHERE "id" = ?1"#) + .bind(id) + .fetch_one(&store.pool) + .await + .unwrap() + } + + fn make_chunk(id: &str, sandbox_id: &str, status: &str) -> DraftChunkRecord { + DraftChunkRecord { + id: id.to_string(), + sandbox_id: sandbox_id.to_string(), + draft_version: 1, + status: status.to_string(), + rule_name: "allow_internal_api".to_string(), + proposed_rule: Vec::new(), + rationale: "test".to_string(), + security_notes: String::new(), + confidence: 0.9, + created_at_ms: 100, + decided_at_ms: None, + host: "internal-api.example.com".to_string(), + port: 443, + binary: "/usr/bin/curl".to_string(), + hit_count: 1, + first_seen_ms: 100, + last_seen_ms: 100, + } + } + + /// Pending chunks carry a `dedup_key`; approving must clear it so a fresh + /// pending submission for the same key can land as a new row. Issue #1245. + #[tokio::test] + async fn approving_pending_chunk_clears_dedup_key() { + let store = SqliteStore::connect("sqlite::memory:").await.unwrap(); + store.migrate().await.unwrap(); + + let chunk = make_chunk("chunk-1", "sb-1", "pending"); + store.put_draft_chunk(&chunk).await.unwrap(); + assert_eq!( + fetch_dedup_key(&store, "chunk-1").await, + Some("internal-api.example.com|443|/usr/bin/curl".to_string()), + "pending chunk must hold its dedup slot" + ); + + store + .update_draft_chunk_status("chunk-1", "approved", Some(200)) + .await + .unwrap(); + assert_eq!( + fetch_dedup_key(&store, "chunk-1").await, + None, + "approved chunk must release its dedup slot" + ); + } + + /// Rejected chunks must also release their dedup slot. Otherwise a future + /// denial for the same (host, port, binary) is silently absorbed. + #[tokio::test] + async fn rejecting_pending_chunk_clears_dedup_key() { + let store = SqliteStore::connect("sqlite::memory:").await.unwrap(); + store.migrate().await.unwrap(); + + let chunk = make_chunk("chunk-1", "sb-1", "pending"); + store.put_draft_chunk(&chunk).await.unwrap(); + + store + .update_draft_chunk_status("chunk-1", "rejected", Some(200)) + .await + .unwrap(); + assert_eq!( + fetch_dedup_key(&store, "chunk-1").await, + None, + "rejected chunk must release its dedup slot" + ); + } + + /// `find_pending_draft_chunk_for_key` returns the pending peer for a + /// (sandbox, host, port, binary) tuple and ignores decided chunks. + #[tokio::test] + async fn find_pending_draft_chunk_for_key_returns_pending_peer() { + let store = SqliteStore::connect("sqlite::memory:").await.unwrap(); + store.migrate().await.unwrap(); + + let approved = make_chunk("chunk-approved", "sb-1", "pending"); + store.put_draft_chunk(&approved).await.unwrap(); + store + .update_draft_chunk_status("chunk-approved", "approved", Some(150)) + .await + .unwrap(); + + // No peer yet — only the now-approved chunk exists. + assert!( + store + .find_pending_draft_chunk_for_key( + "sb-1", + "internal-api.example.com", + 443, + "/usr/bin/curl" + ) + .await + .unwrap() + .is_none() + ); + + let pending = make_chunk("chunk-pending", "sb-1", "pending"); + store.put_draft_chunk(&pending).await.unwrap(); + let peer = store + .find_pending_draft_chunk_for_key( + "sb-1", + "internal-api.example.com", + 443, + "/usr/bin/curl", + ) + .await + .unwrap() + .expect("pending peer must be found"); + assert_eq!(peer.id, "chunk-pending"); + } + + /// `find_other_approved_chunk_for_key` filters approved rows in Rust by + /// deserializing each payload, so a wrong field comparison would silently + /// match unrelated rows and block legitimate operator actions. Cover the + /// four near-miss axes (sandbox, host, port, binary) plus the + /// exclude-self guard and a positive match. + #[tokio::test] + async fn find_other_approved_chunk_for_key_ignores_unrelated_chunks() { + async fn put_approved(store: &SqliteStore, chunk: DraftChunkRecord) { + let id = chunk.id.clone(); + store.put_draft_chunk(&chunk).await.unwrap(); + store + .update_draft_chunk_status(&id, "approved", Some(200)) + .await + .unwrap(); + } + + let store = SqliteStore::connect("sqlite::memory:").await.unwrap(); + store.migrate().await.unwrap(); + + // Self — must be excluded by id. + put_approved(&store, make_chunk("chunk-self", "sb-1", "pending")).await; + // Different sandbox. + put_approved(&store, make_chunk("chunk-other-sandbox", "sb-2", "pending")).await; + // Different host. + let mut other_host = make_chunk("chunk-other-host", "sb-1", "pending"); + other_host.host = "different-host.example.com".to_string(); + put_approved(&store, other_host).await; + // Different port. + let mut other_port = make_chunk("chunk-other-port", "sb-1", "pending"); + other_port.port = 8443; + put_approved(&store, other_port).await; + // Different binary. + let mut other_binary = make_chunk("chunk-other-binary", "sb-1", "pending"); + other_binary.binary = "/usr/bin/wget".to_string(); + put_approved(&store, other_binary).await; + + // Nothing else approved matches → None. + assert!( + store + .find_other_approved_chunk_for_key( + "sb-1", + "internal-api.example.com", + 443, + "/usr/bin/curl", + "chunk-self", + ) + .await + .unwrap() + .is_none(), + "must not match different sandbox/host/port/binary or self" + ); + + // Add a true peer and confirm it is returned. + put_approved(&store, make_chunk("chunk-peer", "sb-1", "pending")).await; + let peer = store + .find_other_approved_chunk_for_key( + "sb-1", + "internal-api.example.com", + 443, + "/usr/bin/curl", + "chunk-self", + ) + .await + .unwrap() + .expect("matching approved peer must be returned"); + assert_eq!(peer.id, "chunk-peer"); + + // And excluding the peer instead returns self (the only remaining match). + let self_match = store + .find_other_approved_chunk_for_key( + "sb-1", + "internal-api.example.com", + 443, + "/usr/bin/curl", + "chunk-peer", + ) + .await + .unwrap() + .expect("with the peer excluded, self must match"); + assert_eq!(self_match.id, "chunk-self"); + } + + /// Migration 005 clears `dedup_key` from any decided rows seeded before + /// the runtime fix landed, but must leave pending rows alone. The test + /// loads the migration SQL directly from the file (via `include_str!`) + /// so a drift between the file and the test's expectations would be + /// caught — and seeds one row of each status, including a pending row + /// that the migration must NOT touch. + #[tokio::test] + async fn migration_clears_dedup_key_on_legacy_decided_rows() { + const MIGRATION_005: &str = + include_str!("../../migrations/sqlite/005_clear_dedup_for_decided_chunks.sql"); + + let store = SqliteStore::connect("sqlite::memory:").await.unwrap(); + store.migrate().await.unwrap(); + + let seed = |id: &'static str, status: &'static str, dedup: &'static str| { + let pool = store.pool.clone(); + async move { + sqlx::query( + r#" +INSERT INTO "objects" ( + "id", "object_type", "scope", "status", "dedup_key", + "hit_count", "payload", "created_at_ms", "updated_at_ms" +) +VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9) +"#, + ) + .bind(id) + .bind(DRAFT_CHUNK_OBJECT_TYPE) + .bind("sb-legacy") + .bind(status) + .bind(dedup) + .bind(1_i64) + .bind(Vec::::new()) + .bind(100_i64) + .bind(100_i64) + .execute(&pool) + .await + .unwrap(); + } + }; + + // Pre-fix gateway: every chunk carries a dedup_key regardless of + // status. Use distinct keys so all three rows can coexist under the + // partial unique index. + seed("legacy-approved", "approved", "host-a|443|/usr/bin/curl").await; + seed("legacy-rejected", "rejected", "host-r|443|/usr/bin/curl").await; + seed("inflight-pending", "pending", "host-p|443|/usr/bin/curl").await; + + sqlx::raw_sql(MIGRATION_005) + .execute(&store.pool) + .await + .unwrap(); + + assert_eq!( + fetch_dedup_key(&store, "legacy-approved").await, + None, + "migration must clear dedup_key on approved legacy rows" + ); + assert_eq!( + fetch_dedup_key(&store, "legacy-rejected").await, + None, + "migration must clear dedup_key on rejected legacy rows" + ); + assert_eq!( + fetch_dedup_key(&store, "inflight-pending").await, + Some("host-p|443|/usr/bin/curl".to_string()), + "migration must NOT clear dedup_key on in-flight pending rows" + ); + } +} diff --git a/crates/openshell-server/src/policy_store.rs b/crates/openshell-server/src/policy_store.rs index f0a43698e..3c3c4bd3d 100644 --- a/crates/openshell-server/src/policy_store.rs +++ b/crates/openshell-server/src/policy_store.rs @@ -58,6 +58,23 @@ pub trait PolicyStoreExt { async fn get_draft_chunk(&self, id: &str) -> PersistenceResult>; + async fn find_pending_draft_chunk_for_key( + &self, + sandbox_id: &str, + host: &str, + port: i32, + binary: &str, + ) -> PersistenceResult>; + + async fn find_other_approved_chunk_for_key( + &self, + sandbox_id: &str, + host: &str, + port: i32, + binary: &str, + exclude_chunk_id: &str, + ) -> PersistenceResult>; + async fn list_draft_chunks( &self, sandbox_id: &str, @@ -200,6 +217,61 @@ impl PolicyStoreExt for Store { } } + async fn find_pending_draft_chunk_for_key( + &self, + sandbox_id: &str, + host: &str, + port: i32, + binary: &str, + ) -> PersistenceResult> { + match self { + Self::Postgres(store) => { + store + .find_pending_draft_chunk_for_key(sandbox_id, host, port, binary) + .await + } + Self::Sqlite(store) => { + store + .find_pending_draft_chunk_for_key(sandbox_id, host, port, binary) + .await + } + } + } + + async fn find_other_approved_chunk_for_key( + &self, + sandbox_id: &str, + host: &str, + port: i32, + binary: &str, + exclude_chunk_id: &str, + ) -> PersistenceResult> { + match self { + Self::Postgres(store) => { + store + .find_other_approved_chunk_for_key( + sandbox_id, + host, + port, + binary, + exclude_chunk_id, + ) + .await + } + Self::Sqlite(store) => { + store + .find_other_approved_chunk_for_key( + sandbox_id, + host, + port, + binary, + exclude_chunk_id, + ) + .await + } + } + } + async fn list_draft_chunks( &self, sandbox_id: &str,